aboutsummaryrefslogtreecommitdiff
path: root/test/metadata_subscribe/metadata_subscribe_integration_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/metadata_subscribe/metadata_subscribe_integration_test.go')
-rw-r--r--test/metadata_subscribe/metadata_subscribe_integration_test.go917
1 files changed, 917 insertions, 0 deletions
diff --git a/test/metadata_subscribe/metadata_subscribe_integration_test.go b/test/metadata_subscribe/metadata_subscribe_integration_test.go
new file mode 100644
index 000000000..9d8b7c39b
--- /dev/null
+++ b/test/metadata_subscribe/metadata_subscribe_integration_test.go
@@ -0,0 +1,917 @@
+package metadata_subscribe
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+// TestMetadataSubscribeBasic tests basic metadata subscription functionality
+func TestMetadataSubscribeBasic(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_metadata_subscribe_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ // Wait for servers to be ready
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("SeaweedFS cluster started successfully")
+
+ t.Run("subscribe_and_receive_events", func(t *testing.T) {
+ // Create a channel to receive events
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100)
+ errChan := make(chan error, 1)
+
+ // Start subscribing in a goroutine
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context canceled") {
+ errChan <- err
+ }
+ }()
+
+ // Wait for subscription to be established
+ time.Sleep(2 * time.Second)
+
+ // Create test files via HTTP
+ testFiles := []string{
+ "/test/file1.txt",
+ "/test/file2.txt",
+ "/test/subdir/file3.txt",
+ }
+
+ for _, path := range testFiles {
+ err := uploadFile("http://127.0.0.1:8888"+path, []byte("test content for "+path))
+ require.NoError(t, err, "Failed to upload %s", path)
+ t.Logf("Uploaded %s", path)
+ }
+
+ // Collect events with timeout
+ receivedPaths := make(map[string]bool)
+ timeout := time.After(30 * time.Second)
+
+ eventLoop:
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ path := filepath.Join(event.Directory, event.EventNotification.NewEntry.Name)
+ t.Logf("Received event for: %s", path)
+ receivedPaths[path] = true
+ }
+ // Check if we received all expected events
+ allReceived := true
+ for _, p := range testFiles {
+ if !receivedPaths[p] {
+ allReceived = false
+ break
+ }
+ }
+ if allReceived {
+ break eventLoop
+ }
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ case <-timeout:
+ t.Logf("Timeout waiting for events. Received %d/%d events", len(receivedPaths), len(testFiles))
+ break eventLoop
+ }
+ }
+
+ // Verify we received events for all test files
+ for _, path := range testFiles {
+ assert.True(t, receivedPaths[path], "Should have received event for %s", path)
+ }
+ })
+}
+
+// TestMetadataSubscribeSingleFilerNoStall tests that subscription doesn't stall
+// in single-filer setups (regression test for issue #4977)
+func TestMetadataSubscribeSingleFilerNoStall(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_single_filer_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ // Wait for servers to be ready
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("Single-filer cluster started")
+
+ t.Run("high_load_subscription_no_stall", func(t *testing.T) {
+ // This test simulates the scenario from issue #4977:
+ // High-load writes while a subscriber tries to keep up
+
+ var receivedCount int64
+ var uploadedCount int64
+
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 1000)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ // Start subscriber
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context canceled") {
+ errChan <- err
+ }
+ }()
+
+ // Wait for subscription to be established
+ time.Sleep(2 * time.Second)
+
+ // Start counting received events
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ atomic.AddInt64(&receivedCount, 1)
+ }
+ }
+ case <-subCtx.Done():
+ return
+ }
+ }
+ }()
+
+ // Upload files concurrently (simulate high load)
+ numFiles := 100
+ numWorkers := 10
+
+ uploadWg := sync.WaitGroup{}
+ for w := 0; w < numWorkers; w++ {
+ uploadWg.Add(1)
+ go func(workerId int) {
+ defer uploadWg.Done()
+ for i := 0; i < numFiles/numWorkers; i++ {
+ path := fmt.Sprintf("/load_test/worker%d/file%d.txt", workerId, i)
+ err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d-%d", workerId, i)))
+ if err == nil {
+ atomic.AddInt64(&uploadedCount, 1)
+ }
+ }
+ }(w)
+ }
+
+ uploadWg.Wait()
+ uploaded := atomic.LoadInt64(&uploadedCount)
+ t.Logf("Uploaded %d files", uploaded)
+
+ // Wait for events to be received (with timeout to detect stall)
+ stallTimeout := time.After(60 * time.Second)
+ checkInterval := time.NewTicker(2 * time.Second)
+ defer checkInterval.Stop()
+
+ lastReceived := atomic.LoadInt64(&receivedCount)
+ staleCount := 0
+
+ waitLoop:
+ for {
+ select {
+ case <-stallTimeout:
+ received := atomic.LoadInt64(&receivedCount)
+ t.Logf("Timeout: received %d/%d events (%.1f%%)",
+ received, uploaded, float64(received)/float64(uploaded)*100)
+ break waitLoop
+ case <-checkInterval.C:
+ received := atomic.LoadInt64(&receivedCount)
+ if received >= uploaded {
+ t.Logf("All %d events received", received)
+ break waitLoop
+ }
+ if received == lastReceived {
+ staleCount++
+ if staleCount >= 5 {
+ // If no progress for 10 seconds, subscription may be stalled
+ t.Logf("WARNING: No progress for %d checks. Received %d/%d (%.1f%%)",
+ staleCount, received, uploaded, float64(received)/float64(uploaded)*100)
+ }
+ } else {
+ staleCount = 0
+ t.Logf("Progress: received %d/%d events (%.1f%%)",
+ received, uploaded, float64(received)/float64(uploaded)*100)
+ }
+ lastReceived = received
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ }
+ }
+
+ subCancel()
+ wg.Wait()
+
+ received := atomic.LoadInt64(&receivedCount)
+
+ // With the fix for #4977, we should receive a high percentage of events
+ // Before the fix, this would stall at ~20-40%
+ percentage := float64(received) / float64(uploaded) * 100
+ t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage)
+
+ // We should receive at least 80% of events (allowing for some timing issues)
+ assert.GreaterOrEqual(t, percentage, 80.0,
+ "Should receive at least 80%% of events (received %.1f%%)", percentage)
+ })
+}
+
+// TestMetadataSubscribeResumeFromDisk tests that subscription can resume from disk
+func TestMetadataSubscribeResumeFromDisk(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_resume_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Run("upload_before_subscribe", func(t *testing.T) {
+ // Upload files BEFORE starting subscription
+ numFiles := 20
+ for i := 0; i < numFiles; i++ {
+ path := fmt.Sprintf("/pre_subscribe/file%d.txt", i)
+ err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d", i)))
+ require.NoError(t, err)
+ }
+ t.Logf("Uploaded %d files before subscription", numFiles)
+
+ // Wait for logs to be flushed to disk
+ time.Sleep(15 * time.Second)
+
+ // Now start subscription from the beginning
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithTimeout(ctx, 30*time.Second)
+ defer subCancel()
+
+ go func() {
+ err := subscribeToMetadataFromBeginning(subCtx, "127.0.0.1:8888", "/pre_subscribe/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context") {
+ errChan <- err
+ }
+ }()
+
+ // Count received events
+ receivedCount := 0
+ timeout := time.After(30 * time.Second)
+
+ countLoop:
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ receivedCount++
+ t.Logf("Received event %d: %s/%s", receivedCount,
+ event.Directory, event.EventNotification.NewEntry.Name)
+ }
+ }
+ if receivedCount >= numFiles {
+ break countLoop
+ }
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ case <-timeout:
+ t.Logf("Timeout: received %d/%d events", receivedCount, numFiles)
+ break countLoop
+ }
+ }
+
+ // Should receive all pre-uploaded files from disk
+ assert.GreaterOrEqual(t, receivedCount, numFiles-2, // Allow small margin
+ "Should receive most pre-uploaded files from disk (received %d/%d)", receivedCount, numFiles)
+ })
+}
+
+// TestMetadataSubscribeConcurrentWrites tests subscription with concurrent writes
+func TestMetadataSubscribeConcurrentWrites(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_concurrent_writes_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("Cluster started for concurrent writes test")
+
+ t.Run("concurrent_goroutine_writes", func(t *testing.T) {
+ var receivedCount int64
+ var uploadedCount int64
+
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 10000)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ // Start subscriber
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/concurrent/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context") {
+ errChan <- err
+ }
+ }()
+
+ time.Sleep(2 * time.Second)
+
+ // Start counting received events
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ atomic.AddInt64(&receivedCount, 1)
+ }
+ }
+ case <-subCtx.Done():
+ return
+ }
+ }
+ }()
+
+ // Launch many concurrent writers
+ numWorkers := 50
+ filesPerWorker := 20
+ totalExpected := int64(numWorkers * filesPerWorker)
+
+ uploadWg := sync.WaitGroup{}
+ for w := 0; w < numWorkers; w++ {
+ uploadWg.Add(1)
+ go func(workerId int) {
+ defer uploadWg.Done()
+ for i := 0; i < filesPerWorker; i++ {
+ path := fmt.Sprintf("/concurrent/w%d/f%d.txt", workerId, i)
+ content := []byte(fmt.Sprintf("worker%d-file%d", workerId, i))
+ if err := uploadFile("http://127.0.0.1:8888"+path, content); err == nil {
+ atomic.AddInt64(&uploadedCount, 1)
+ }
+ }
+ }(w)
+ }
+
+ uploadWg.Wait()
+ uploaded := atomic.LoadInt64(&uploadedCount)
+ t.Logf("Uploaded %d/%d files from %d concurrent workers", uploaded, totalExpected, numWorkers)
+
+ // Wait for events with progress tracking
+ stallTimeout := time.After(90 * time.Second)
+ checkInterval := time.NewTicker(3 * time.Second)
+ defer checkInterval.Stop()
+
+ lastReceived := int64(0)
+ stableCount := 0
+
+ waitLoop:
+ for {
+ select {
+ case <-stallTimeout:
+ break waitLoop
+ case <-checkInterval.C:
+ received := atomic.LoadInt64(&receivedCount)
+ if received >= uploaded {
+ t.Logf("All %d events received", received)
+ break waitLoop
+ }
+ if received == lastReceived {
+ stableCount++
+ if stableCount >= 5 {
+ t.Logf("No progress for %d checks, received %d/%d", stableCount, received, uploaded)
+ break waitLoop
+ }
+ } else {
+ stableCount = 0
+ t.Logf("Progress: %d/%d (%.1f%%)", received, uploaded, float64(received)/float64(uploaded)*100)
+ }
+ lastReceived = received
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ }
+ }
+
+ subCancel()
+ wg.Wait()
+
+ received := atomic.LoadInt64(&receivedCount)
+ percentage := float64(received) / float64(uploaded) * 100
+ t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage)
+
+ // Should receive at least 80% of events
+ assert.GreaterOrEqual(t, percentage, 80.0,
+ "Should receive at least 80%% of concurrent write events")
+ })
+}
+
+// TestMetadataSubscribeMillionUpdates tests subscription with 1 million metadata updates
+// This test creates metadata entries directly via gRPC without actual file content
+func TestMetadataSubscribeMillionUpdates(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_million_updates_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("Cluster started for million updates test")
+
+ t.Run("million_metadata_updates", func(t *testing.T) {
+ var receivedCount int64
+ var createdCount int64
+ totalEntries := int64(1_000_000)
+
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100000)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ // Start subscriber
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/million/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context") {
+ errChan <- err
+ }
+ }()
+
+ time.Sleep(2 * time.Second)
+
+ // Start counting received events
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ atomic.AddInt64(&receivedCount, 1)
+ }
+ }
+ case <-subCtx.Done():
+ return
+ }
+ }
+ }()
+
+ // Create metadata entries directly via gRPC (no actual file content)
+ numWorkers := 100
+ entriesPerWorker := int(totalEntries) / numWorkers
+
+ startTime := time.Now()
+ createWg := sync.WaitGroup{}
+
+ for w := 0; w < numWorkers; w++ {
+ createWg.Add(1)
+ go func(workerId int) {
+ defer createWg.Done()
+ grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
+
+ err := pb.WithFilerClient(false, 0, pb.ServerAddress("127.0.0.1:8888"), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ for i := 0; i < entriesPerWorker; i++ {
+ dir := fmt.Sprintf("/million/bucket%d", workerId%100)
+ name := fmt.Sprintf("entry_%d_%d", workerId, i)
+
+ _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ FileSize: 100,
+ Mtime: time.Now().Unix(),
+ FileMode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ },
+ },
+ })
+ if err == nil {
+ atomic.AddInt64(&createdCount, 1)
+ }
+
+ // Log progress every 10000 entries per worker
+ if i > 0 && i%10000 == 0 {
+ created := atomic.LoadInt64(&createdCount)
+ elapsed := time.Since(startTime)
+ rate := float64(created) / elapsed.Seconds()
+ t.Logf("Worker %d: created %d entries, total %d (%.0f/sec)",
+ workerId, i, created, rate)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ t.Logf("Worker %d error: %v", workerId, err)
+ }
+ }(w)
+ }
+
+ // Progress reporter
+ progressDone := make(chan struct{})
+ go func() {
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ created := atomic.LoadInt64(&createdCount)
+ received := atomic.LoadInt64(&receivedCount)
+ elapsed := time.Since(startTime)
+ createRate := float64(created) / elapsed.Seconds()
+ receiveRate := float64(received) / elapsed.Seconds()
+ t.Logf("Progress: created %d (%.0f/sec), received %d (%.0f/sec), lag %d",
+ created, createRate, received, receiveRate, created-received)
+ case <-progressDone:
+ return
+ }
+ }
+ }()
+
+ createWg.Wait()
+ close(progressDone)
+
+ created := atomic.LoadInt64(&createdCount)
+ elapsed := time.Since(startTime)
+ t.Logf("Created %d entries in %v (%.0f/sec)", created, elapsed, float64(created)/elapsed.Seconds())
+
+ // Wait for subscription to catch up
+ catchupTimeout := time.After(5 * time.Minute)
+ checkInterval := time.NewTicker(5 * time.Second)
+ defer checkInterval.Stop()
+
+ lastReceived := int64(0)
+ stableCount := 0
+
+ waitLoop:
+ for {
+ select {
+ case <-catchupTimeout:
+ t.Logf("Catchup timeout reached")
+ break waitLoop
+ case <-checkInterval.C:
+ received := atomic.LoadInt64(&receivedCount)
+ if received >= created {
+ t.Logf("All %d events received", received)
+ break waitLoop
+ }
+ if received == lastReceived {
+ stableCount++
+ if stableCount >= 10 {
+ t.Logf("No progress for %d checks", stableCount)
+ break waitLoop
+ }
+ } else {
+ stableCount = 0
+ rate := float64(received-lastReceived) / 5.0
+ t.Logf("Catching up: %d/%d (%.1f%%) at %.0f/sec",
+ received, created, float64(received)/float64(created)*100, rate)
+ }
+ lastReceived = received
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ }
+ }
+
+ subCancel()
+ wg.Wait()
+
+ received := atomic.LoadInt64(&receivedCount)
+ percentage := float64(received) / float64(created) * 100
+ totalTime := time.Since(startTime)
+ t.Logf("Final: created %d, received %d (%.1f%%) in %v", created, received, percentage, totalTime)
+
+ // For million entries, we expect at least 90% to be received
+ assert.GreaterOrEqual(t, percentage, 90.0,
+ "Should receive at least 90%% of million metadata events (received %.1f%%)", percentage)
+ })
+}
+
+// Helper types and functions
+
+type TestCluster struct {
+ masterCmd *exec.Cmd
+ volumeCmd *exec.Cmd
+ filerCmd *exec.Cmd
+ testDir string
+}
+
+func (c *TestCluster) Stop() {
+ if c.filerCmd != nil && c.filerCmd.Process != nil {
+ c.filerCmd.Process.Kill()
+ c.filerCmd.Wait()
+ }
+ if c.volumeCmd != nil && c.volumeCmd.Process != nil {
+ c.volumeCmd.Process.Kill()
+ c.volumeCmd.Wait()
+ }
+ if c.masterCmd != nil && c.masterCmd.Process != nil {
+ c.masterCmd.Process.Kill()
+ c.masterCmd.Wait()
+ }
+}
+
+func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, error) {
+ weedBinary := findWeedBinary()
+ if weedBinary == "" {
+ return nil, fmt.Errorf("weed binary not found")
+ }
+
+ cluster := &TestCluster{testDir: dataDir}
+
+ // Create directories
+ masterDir := filepath.Join(dataDir, "master")
+ volumeDir := filepath.Join(dataDir, "volume")
+ filerDir := filepath.Join(dataDir, "filer")
+ if err := os.MkdirAll(masterDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create master dir: %v", err)
+ }
+ if err := os.MkdirAll(volumeDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create volume dir: %v", err)
+ }
+ if err := os.MkdirAll(filerDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create filer dir: %v", err)
+ }
+
+ // Start master server
+ masterCmd := exec.CommandContext(ctx, weedBinary, "master",
+ "-port", "9333",
+ "-mdir", masterDir,
+ "-volumeSizeLimitMB", "10",
+ "-ip", "127.0.0.1",
+ "-peers", "none",
+ )
+ masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create master log file: %v", err)
+ }
+ masterCmd.Stdout = masterLogFile
+ masterCmd.Stderr = masterLogFile
+ if err := masterCmd.Start(); err != nil {
+ return nil, fmt.Errorf("failed to start master: %v", err)
+ }
+ cluster.masterCmd = masterCmd
+
+ time.Sleep(2 * time.Second)
+
+ // Start volume server
+ volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
+ "-port", "8080",
+ "-dir", volumeDir,
+ "-max", "10",
+ "-master", "127.0.0.1:9333",
+ "-ip", "127.0.0.1",
+ )
+ volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log"))
+ if err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create volume log file: %v", err)
+ }
+ volumeCmd.Stdout = volumeLogFile
+ volumeCmd.Stderr = volumeLogFile
+ if err := volumeCmd.Start(); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to start volume: %v", err)
+ }
+ cluster.volumeCmd = volumeCmd
+
+ time.Sleep(2 * time.Second)
+
+ // Start filer server
+ filerCmd := exec.CommandContext(ctx, weedBinary, "filer",
+ "-port", "8888",
+ "-master", "127.0.0.1:9333",
+ "-ip", "127.0.0.1",
+ )
+ filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log"))
+ if err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create filer log file: %v", err)
+ }
+ filerCmd.Stdout = filerLogFile
+ filerCmd.Stderr = filerLogFile
+ if err := filerCmd.Start(); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to start filer: %v", err)
+ }
+ cluster.filerCmd = filerCmd
+
+ time.Sleep(3 * time.Second)
+
+ return cluster, nil
+}
+
+func findWeedBinary() string {
+ candidates := []string{
+ "../../../weed/weed",
+ "../../weed/weed",
+ "./weed",
+ "weed",
+ }
+ for _, candidate := range candidates {
+ if _, err := os.Stat(candidate); err == nil {
+ return candidate
+ }
+ }
+ if path, err := exec.LookPath("weed"); err == nil {
+ return path
+ }
+ return ""
+}
+
+func waitForHTTPServer(url string, timeout time.Duration) error {
+ start := time.Now()
+ for time.Since(start) < timeout {
+ resp, err := http.Get(url)
+ if err == nil {
+ resp.Body.Close()
+ return nil
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ return fmt.Errorf("timeout waiting for server %s", url)
+}
+
+func uploadFile(url string, content []byte) error {
+ // Create multipart form data
+ var buf bytes.Buffer
+ writer := multipart.NewWriter(&buf)
+
+ // Extract filename from URL path
+ parts := strings.Split(url, "/")
+ filename := parts[len(parts)-1]
+ if filename == "" {
+ filename = "file.txt"
+ }
+
+ // Create form file field
+ part, err := writer.CreateFormFile("file", filename)
+ if err != nil {
+ return fmt.Errorf("create form file: %v", err)
+ }
+ if _, err := part.Write(content); err != nil {
+ return fmt.Errorf("write content: %v", err)
+ }
+ if err := writer.Close(); err != nil {
+ return fmt.Errorf("close writer: %v", err)
+ }
+
+ req, err := http.NewRequest("POST", url, &buf)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", writer.FormDataContentType())
+
+ client := &http.Client{Timeout: 10 * time.Second}
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode >= 400 {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
+ }
+ return nil
+}
+
+func subscribeToMetadata(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
+ return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, time.Now().UnixNano(), eventsChan)
+}
+
+func subscribeToMetadataFromBeginning(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
+ // Start from Unix epoch to get all events
+ return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, 0, eventsChan)
+}
+
+func subscribeToMetadataWithOptions(ctx context.Context, filerGrpcAddress, pathPrefix string, sinceNs int64, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ if grpcDialOption == nil {
+ grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
+ }
+
+ return pb.WithFilerClient(false, 0, pb.ServerAddress(filerGrpcAddress), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "integration_test",
+ PathPrefix: pathPrefix,
+ SinceNs: sinceNs,
+ ClientId: util.RandomInt32(),
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF || ctx.Err() != nil {
+ return nil
+ }
+ return err
+ }
+
+ select {
+ case eventsChan <- resp:
+ case <-ctx.Done():
+ return nil
+ case <-time.After(100 * time.Millisecond):
+ // Channel full after brief wait, log warning
+ glog.Warningf("Event channel full, skipping event for %s", resp.Directory)
+ }
+ }
+ })
+}