diff options
Diffstat (limited to 'test/metadata_subscribe/metadata_subscribe_integration_test.go')
| -rw-r--r-- | test/metadata_subscribe/metadata_subscribe_integration_test.go | 917 |
1 files changed, 917 insertions, 0 deletions
diff --git a/test/metadata_subscribe/metadata_subscribe_integration_test.go b/test/metadata_subscribe/metadata_subscribe_integration_test.go new file mode 100644 index 000000000..9d8b7c39b --- /dev/null +++ b/test/metadata_subscribe/metadata_subscribe_integration_test.go @@ -0,0 +1,917 @@ +package metadata_subscribe + +import ( + "bytes" + "context" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// TestMetadataSubscribeBasic tests basic metadata subscription functionality +func TestMetadataSubscribeBasic(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_metadata_subscribe_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second)) + + t.Logf("SeaweedFS cluster started successfully") + + t.Run("subscribe_and_receive_events", func(t *testing.T) { + // Create a channel to receive events + eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100) + errChan := make(chan error, 1) + + // Start subscribing in a goroutine + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + + go func() { + err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan) + if err != nil && !strings.Contains(err.Error(), "context canceled") { + errChan <- err + } + }() + + // Wait for subscription to be established + time.Sleep(2 * time.Second) + + // Create test files via HTTP + testFiles := []string{ + "/test/file1.txt", + "/test/file2.txt", + "/test/subdir/file3.txt", + } + + for _, path := range testFiles { + err := uploadFile("http://127.0.0.1:8888"+path, []byte("test content for "+path)) + require.NoError(t, err, "Failed to upload %s", path) + t.Logf("Uploaded %s", path) + } + + // Collect events with timeout + receivedPaths := make(map[string]bool) + timeout := time.After(30 * time.Second) + + eventLoop: + for { + select { + case event := <-eventsChan: + if event.EventNotification != nil && event.EventNotification.NewEntry != nil { + path := filepath.Join(event.Directory, event.EventNotification.NewEntry.Name) + t.Logf("Received event for: %s", path) + receivedPaths[path] = true + } + // Check if we received all expected events + allReceived := true + for _, p := range testFiles { + if !receivedPaths[p] { + allReceived = false + break + } + } + if allReceived { + break eventLoop + } + case err := <-errChan: + t.Fatalf("Subscription error: %v", err) + case <-timeout: + t.Logf("Timeout waiting for events. Received %d/%d events", len(receivedPaths), len(testFiles)) + break eventLoop + } + } + + // Verify we received events for all test files + for _, path := range testFiles { + assert.True(t, receivedPaths[path], "Should have received event for %s", path) + } + }) +} + +// TestMetadataSubscribeSingleFilerNoStall tests that subscription doesn't stall +// in single-filer setups (regression test for issue #4977) +func TestMetadataSubscribeSingleFilerNoStall(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_single_filer_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second)) + + t.Logf("Single-filer cluster started") + + t.Run("high_load_subscription_no_stall", func(t *testing.T) { + // This test simulates the scenario from issue #4977: + // High-load writes while a subscriber tries to keep up + + var receivedCount int64 + var uploadedCount int64 + + eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 1000) + errChan := make(chan error, 1) + + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + + // Start subscriber + go func() { + err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan) + if err != nil && !strings.Contains(err.Error(), "context canceled") { + errChan <- err + } + }() + + // Wait for subscription to be established + time.Sleep(2 * time.Second) + + // Start counting received events + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case event := <-eventsChan: + if event.EventNotification != nil && event.EventNotification.NewEntry != nil { + if !event.EventNotification.NewEntry.IsDirectory { + atomic.AddInt64(&receivedCount, 1) + } + } + case <-subCtx.Done(): + return + } + } + }() + + // Upload files concurrently (simulate high load) + numFiles := 100 + numWorkers := 10 + + uploadWg := sync.WaitGroup{} + for w := 0; w < numWorkers; w++ { + uploadWg.Add(1) + go func(workerId int) { + defer uploadWg.Done() + for i := 0; i < numFiles/numWorkers; i++ { + path := fmt.Sprintf("/load_test/worker%d/file%d.txt", workerId, i) + err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d-%d", workerId, i))) + if err == nil { + atomic.AddInt64(&uploadedCount, 1) + } + } + }(w) + } + + uploadWg.Wait() + uploaded := atomic.LoadInt64(&uploadedCount) + t.Logf("Uploaded %d files", uploaded) + + // Wait for events to be received (with timeout to detect stall) + stallTimeout := time.After(60 * time.Second) + checkInterval := time.NewTicker(2 * time.Second) + defer checkInterval.Stop() + + lastReceived := atomic.LoadInt64(&receivedCount) + staleCount := 0 + + waitLoop: + for { + select { + case <-stallTimeout: + received := atomic.LoadInt64(&receivedCount) + t.Logf("Timeout: received %d/%d events (%.1f%%)", + received, uploaded, float64(received)/float64(uploaded)*100) + break waitLoop + case <-checkInterval.C: + received := atomic.LoadInt64(&receivedCount) + if received >= uploaded { + t.Logf("All %d events received", received) + break waitLoop + } + if received == lastReceived { + staleCount++ + if staleCount >= 5 { + // If no progress for 10 seconds, subscription may be stalled + t.Logf("WARNING: No progress for %d checks. Received %d/%d (%.1f%%)", + staleCount, received, uploaded, float64(received)/float64(uploaded)*100) + } + } else { + staleCount = 0 + t.Logf("Progress: received %d/%d events (%.1f%%)", + received, uploaded, float64(received)/float64(uploaded)*100) + } + lastReceived = received + case err := <-errChan: + t.Fatalf("Subscription error: %v", err) + } + } + + subCancel() + wg.Wait() + + received := atomic.LoadInt64(&receivedCount) + + // With the fix for #4977, we should receive a high percentage of events + // Before the fix, this would stall at ~20-40% + percentage := float64(received) / float64(uploaded) * 100 + t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage) + + // We should receive at least 80% of events (allowing for some timing issues) + assert.GreaterOrEqual(t, percentage, 80.0, + "Should receive at least 80%% of events (received %.1f%%)", percentage) + }) +} + +// TestMetadataSubscribeResumeFromDisk tests that subscription can resume from disk +func TestMetadataSubscribeResumeFromDisk(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_resume_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second)) + + t.Run("upload_before_subscribe", func(t *testing.T) { + // Upload files BEFORE starting subscription + numFiles := 20 + for i := 0; i < numFiles; i++ { + path := fmt.Sprintf("/pre_subscribe/file%d.txt", i) + err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d", i))) + require.NoError(t, err) + } + t.Logf("Uploaded %d files before subscription", numFiles) + + // Wait for logs to be flushed to disk + time.Sleep(15 * time.Second) + + // Now start subscription from the beginning + eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100) + errChan := make(chan error, 1) + + subCtx, subCancel := context.WithTimeout(ctx, 30*time.Second) + defer subCancel() + + go func() { + err := subscribeToMetadataFromBeginning(subCtx, "127.0.0.1:8888", "/pre_subscribe/", eventsChan) + if err != nil && !strings.Contains(err.Error(), "context") { + errChan <- err + } + }() + + // Count received events + receivedCount := 0 + timeout := time.After(30 * time.Second) + + countLoop: + for { + select { + case event := <-eventsChan: + if event.EventNotification != nil && event.EventNotification.NewEntry != nil { + if !event.EventNotification.NewEntry.IsDirectory { + receivedCount++ + t.Logf("Received event %d: %s/%s", receivedCount, + event.Directory, event.EventNotification.NewEntry.Name) + } + } + if receivedCount >= numFiles { + break countLoop + } + case err := <-errChan: + t.Fatalf("Subscription error: %v", err) + case <-timeout: + t.Logf("Timeout: received %d/%d events", receivedCount, numFiles) + break countLoop + } + } + + // Should receive all pre-uploaded files from disk + assert.GreaterOrEqual(t, receivedCount, numFiles-2, // Allow small margin + "Should receive most pre-uploaded files from disk (received %d/%d)", receivedCount, numFiles) + }) +} + +// TestMetadataSubscribeConcurrentWrites tests subscription with concurrent writes +func TestMetadataSubscribeConcurrentWrites(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_concurrent_writes_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second)) + + t.Logf("Cluster started for concurrent writes test") + + t.Run("concurrent_goroutine_writes", func(t *testing.T) { + var receivedCount int64 + var uploadedCount int64 + + eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 10000) + errChan := make(chan error, 1) + + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + + // Start subscriber + go func() { + err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/concurrent/", eventsChan) + if err != nil && !strings.Contains(err.Error(), "context") { + errChan <- err + } + }() + + time.Sleep(2 * time.Second) + + // Start counting received events + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case event := <-eventsChan: + if event.EventNotification != nil && event.EventNotification.NewEntry != nil { + if !event.EventNotification.NewEntry.IsDirectory { + atomic.AddInt64(&receivedCount, 1) + } + } + case <-subCtx.Done(): + return + } + } + }() + + // Launch many concurrent writers + numWorkers := 50 + filesPerWorker := 20 + totalExpected := int64(numWorkers * filesPerWorker) + + uploadWg := sync.WaitGroup{} + for w := 0; w < numWorkers; w++ { + uploadWg.Add(1) + go func(workerId int) { + defer uploadWg.Done() + for i := 0; i < filesPerWorker; i++ { + path := fmt.Sprintf("/concurrent/w%d/f%d.txt", workerId, i) + content := []byte(fmt.Sprintf("worker%d-file%d", workerId, i)) + if err := uploadFile("http://127.0.0.1:8888"+path, content); err == nil { + atomic.AddInt64(&uploadedCount, 1) + } + } + }(w) + } + + uploadWg.Wait() + uploaded := atomic.LoadInt64(&uploadedCount) + t.Logf("Uploaded %d/%d files from %d concurrent workers", uploaded, totalExpected, numWorkers) + + // Wait for events with progress tracking + stallTimeout := time.After(90 * time.Second) + checkInterval := time.NewTicker(3 * time.Second) + defer checkInterval.Stop() + + lastReceived := int64(0) + stableCount := 0 + + waitLoop: + for { + select { + case <-stallTimeout: + break waitLoop + case <-checkInterval.C: + received := atomic.LoadInt64(&receivedCount) + if received >= uploaded { + t.Logf("All %d events received", received) + break waitLoop + } + if received == lastReceived { + stableCount++ + if stableCount >= 5 { + t.Logf("No progress for %d checks, received %d/%d", stableCount, received, uploaded) + break waitLoop + } + } else { + stableCount = 0 + t.Logf("Progress: %d/%d (%.1f%%)", received, uploaded, float64(received)/float64(uploaded)*100) + } + lastReceived = received + case err := <-errChan: + t.Fatalf("Subscription error: %v", err) + } + } + + subCancel() + wg.Wait() + + received := atomic.LoadInt64(&receivedCount) + percentage := float64(received) / float64(uploaded) * 100 + t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage) + + // Should receive at least 80% of events + assert.GreaterOrEqual(t, percentage, 80.0, + "Should receive at least 80%% of concurrent write events") + }) +} + +// TestMetadataSubscribeMillionUpdates tests subscription with 1 million metadata updates +// This test creates metadata entries directly via gRPC without actual file content +func TestMetadataSubscribeMillionUpdates(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_million_updates_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + cluster, err := startSeaweedFSCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second)) + require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second)) + + t.Logf("Cluster started for million updates test") + + t.Run("million_metadata_updates", func(t *testing.T) { + var receivedCount int64 + var createdCount int64 + totalEntries := int64(1_000_000) + + eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100000) + errChan := make(chan error, 1) + + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + + // Start subscriber + go func() { + err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/million/", eventsChan) + if err != nil && !strings.Contains(err.Error(), "context") { + errChan <- err + } + }() + + time.Sleep(2 * time.Second) + + // Start counting received events + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case event := <-eventsChan: + if event.EventNotification != nil && event.EventNotification.NewEntry != nil { + if !event.EventNotification.NewEntry.IsDirectory { + atomic.AddInt64(&receivedCount, 1) + } + } + case <-subCtx.Done(): + return + } + } + }() + + // Create metadata entries directly via gRPC (no actual file content) + numWorkers := 100 + entriesPerWorker := int(totalEntries) / numWorkers + + startTime := time.Now() + createWg := sync.WaitGroup{} + + for w := 0; w < numWorkers; w++ { + createWg.Add(1) + go func(workerId int) { + defer createWg.Done() + grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + + err := pb.WithFilerClient(false, 0, pb.ServerAddress("127.0.0.1:8888"), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + for i := 0; i < entriesPerWorker; i++ { + dir := fmt.Sprintf("/million/bucket%d", workerId%100) + name := fmt.Sprintf("entry_%d_%d", workerId, i) + + _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 100, + Mtime: time.Now().Unix(), + FileMode: 0644, + Uid: 1000, + Gid: 1000, + }, + }, + }) + if err == nil { + atomic.AddInt64(&createdCount, 1) + } + + // Log progress every 10000 entries per worker + if i > 0 && i%10000 == 0 { + created := atomic.LoadInt64(&createdCount) + elapsed := time.Since(startTime) + rate := float64(created) / elapsed.Seconds() + t.Logf("Worker %d: created %d entries, total %d (%.0f/sec)", + workerId, i, created, rate) + } + } + return nil + }) + if err != nil { + t.Logf("Worker %d error: %v", workerId, err) + } + }(w) + } + + // Progress reporter + progressDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + created := atomic.LoadInt64(&createdCount) + received := atomic.LoadInt64(&receivedCount) + elapsed := time.Since(startTime) + createRate := float64(created) / elapsed.Seconds() + receiveRate := float64(received) / elapsed.Seconds() + t.Logf("Progress: created %d (%.0f/sec), received %d (%.0f/sec), lag %d", + created, createRate, received, receiveRate, created-received) + case <-progressDone: + return + } + } + }() + + createWg.Wait() + close(progressDone) + + created := atomic.LoadInt64(&createdCount) + elapsed := time.Since(startTime) + t.Logf("Created %d entries in %v (%.0f/sec)", created, elapsed, float64(created)/elapsed.Seconds()) + + // Wait for subscription to catch up + catchupTimeout := time.After(5 * time.Minute) + checkInterval := time.NewTicker(5 * time.Second) + defer checkInterval.Stop() + + lastReceived := int64(0) + stableCount := 0 + + waitLoop: + for { + select { + case <-catchupTimeout: + t.Logf("Catchup timeout reached") + break waitLoop + case <-checkInterval.C: + received := atomic.LoadInt64(&receivedCount) + if received >= created { + t.Logf("All %d events received", received) + break waitLoop + } + if received == lastReceived { + stableCount++ + if stableCount >= 10 { + t.Logf("No progress for %d checks", stableCount) + break waitLoop + } + } else { + stableCount = 0 + rate := float64(received-lastReceived) / 5.0 + t.Logf("Catching up: %d/%d (%.1f%%) at %.0f/sec", + received, created, float64(received)/float64(created)*100, rate) + } + lastReceived = received + case err := <-errChan: + t.Fatalf("Subscription error: %v", err) + } + } + + subCancel() + wg.Wait() + + received := atomic.LoadInt64(&receivedCount) + percentage := float64(received) / float64(created) * 100 + totalTime := time.Since(startTime) + t.Logf("Final: created %d, received %d (%.1f%%) in %v", created, received, percentage, totalTime) + + // For million entries, we expect at least 90% to be received + assert.GreaterOrEqual(t, percentage, 90.0, + "Should receive at least 90%% of million metadata events (received %.1f%%)", percentage) + }) +} + +// Helper types and functions + +type TestCluster struct { + masterCmd *exec.Cmd + volumeCmd *exec.Cmd + filerCmd *exec.Cmd + testDir string +} + +func (c *TestCluster) Stop() { + if c.filerCmd != nil && c.filerCmd.Process != nil { + c.filerCmd.Process.Kill() + c.filerCmd.Wait() + } + if c.volumeCmd != nil && c.volumeCmd.Process != nil { + c.volumeCmd.Process.Kill() + c.volumeCmd.Wait() + } + if c.masterCmd != nil && c.masterCmd.Process != nil { + c.masterCmd.Process.Kill() + c.masterCmd.Wait() + } +} + +func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &TestCluster{testDir: dataDir} + + // Create directories + masterDir := filepath.Join(dataDir, "master") + volumeDir := filepath.Join(dataDir, "volume") + filerDir := filepath.Join(dataDir, "filer") + if err := os.MkdirAll(masterDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create master dir: %v", err) + } + if err := os.MkdirAll(volumeDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create volume dir: %v", err) + } + if err := os.MkdirAll(filerDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create filer dir: %v", err) + } + + // Start master server + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9333", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + "-peers", "none", + ) + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master: %v", err) + } + cluster.masterCmd = masterCmd + + time.Sleep(2 * time.Second) + + // Start volume server + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", "8080", + "-dir", volumeDir, + "-max", "10", + "-master", "127.0.0.1:9333", + "-ip", "127.0.0.1", + ) + volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume: %v", err) + } + cluster.volumeCmd = volumeCmd + + time.Sleep(2 * time.Second) + + // Start filer server + filerCmd := exec.CommandContext(ctx, weedBinary, "filer", + "-port", "8888", + "-master", "127.0.0.1:9333", + "-ip", "127.0.0.1", + ) + filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create filer log file: %v", err) + } + filerCmd.Stdout = filerLogFile + filerCmd.Stderr = filerLogFile + if err := filerCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start filer: %v", err) + } + cluster.filerCmd = filerCmd + + time.Sleep(3 * time.Second) + + return cluster, nil +} + +func findWeedBinary() string { + candidates := []string{ + "../../../weed/weed", + "../../weed/weed", + "./weed", + "weed", + } + for _, candidate := range candidates { + if _, err := os.Stat(candidate); err == nil { + return candidate + } + } + if path, err := exec.LookPath("weed"); err == nil { + return path + } + return "" +} + +func waitForHTTPServer(url string, timeout time.Duration) error { + start := time.Now() + for time.Since(start) < timeout { + resp, err := http.Get(url) + if err == nil { + resp.Body.Close() + return nil + } + time.Sleep(500 * time.Millisecond) + } + return fmt.Errorf("timeout waiting for server %s", url) +} + +func uploadFile(url string, content []byte) error { + // Create multipart form data + var buf bytes.Buffer + writer := multipart.NewWriter(&buf) + + // Extract filename from URL path + parts := strings.Split(url, "/") + filename := parts[len(parts)-1] + if filename == "" { + filename = "file.txt" + } + + // Create form file field + part, err := writer.CreateFormFile("file", filename) + if err != nil { + return fmt.Errorf("create form file: %v", err) + } + if _, err := part.Write(content); err != nil { + return fmt.Errorf("write content: %v", err) + } + if err := writer.Close(); err != nil { + return fmt.Errorf("close writer: %v", err) + } + + req, err := http.NewRequest("POST", url, &buf) + if err != nil { + return err + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +func subscribeToMetadata(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error { + return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, time.Now().UnixNano(), eventsChan) +} + +func subscribeToMetadataFromBeginning(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error { + // Start from Unix epoch to get all events + return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, 0, eventsChan) +} + +func subscribeToMetadataWithOptions(ctx context.Context, filerGrpcAddress, pathPrefix string, sinceNs int64, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error { + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + if grpcDialOption == nil { + grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials()) + } + + return pb.WithFilerClient(false, 0, pb.ServerAddress(filerGrpcAddress), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "integration_test", + PathPrefix: pathPrefix, + SinceNs: sinceNs, + ClientId: util.RandomInt32(), + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, err := stream.Recv() + if err != nil { + if err == io.EOF || ctx.Err() != nil { + return nil + } + return err + } + + select { + case eventsChan <- resp: + case <-ctx.Done(): + return nil + case <-time.After(100 * time.Millisecond): + // Channel full after brief wait, log warning + glog.Warningf("Event channel full, skipping event for %s", resp.Directory) + } + } + }) +} |
