aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/metadata-subscribe-tests.yml92
-rw-r--r--test/erasure_coding/ec_integration_test.go1
-rw-r--r--test/metadata_subscribe/Makefile27
-rw-r--r--test/metadata_subscribe/README.md49
-rw-r--r--test/metadata_subscribe/metadata_subscribe_integration_test.go917
-rw-r--r--weed/util/log_buffer/log_buffer.go5
6 files changed, 1090 insertions, 1 deletions
diff --git a/.github/workflows/metadata-subscribe-tests.yml b/.github/workflows/metadata-subscribe-tests.yml
new file mode 100644
index 000000000..a27a9f296
--- /dev/null
+++ b/.github/workflows/metadata-subscribe-tests.yml
@@ -0,0 +1,92 @@
+name: "Metadata Subscribe Integration Tests"
+
+on:
+ push:
+ branches: [ master ]
+ paths:
+ - 'weed/filer/**'
+ - 'weed/pb/filer_pb/**'
+ - 'weed/util/log_buffer/**'
+ - 'weed/server/filer_grpc_server_sub_meta.go'
+ - 'weed/command/filer_backup.go'
+ - 'test/metadata_subscribe/**'
+ - '.github/workflows/metadata-subscribe-tests.yml'
+ pull_request:
+ branches: [ master ]
+ paths:
+ - 'weed/filer/**'
+ - 'weed/pb/filer_pb/**'
+ - 'weed/util/log_buffer/**'
+ - 'weed/server/filer_grpc_server_sub_meta.go'
+ - 'weed/command/filer_backup.go'
+ - 'test/metadata_subscribe/**'
+ - '.github/workflows/metadata-subscribe-tests.yml'
+
+concurrency:
+ group: ${{ github.head_ref }}/metadata-subscribe-tests
+ cancel-in-progress: true
+
+permissions:
+ contents: read
+
+env:
+ GO_VERSION: '1.24'
+ TEST_TIMEOUT: '10m'
+
+jobs:
+ metadata-subscribe-integration:
+ name: Metadata Subscribe Integration Tests
+ runs-on: ubuntu-22.04
+ timeout-minutes: 20
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v6
+
+ - name: Set up Go ${{ env.GO_VERSION }}
+ uses: actions/setup-go@v6
+ with:
+ go-version: ${{ env.GO_VERSION }}
+
+ - name: Build SeaweedFS
+ run: |
+ cd weed
+ go build -o weed .
+ chmod +x weed
+ ./weed version
+
+ - name: Run Metadata Subscribe Integration Tests
+ run: |
+ cd test/metadata_subscribe
+
+ echo "Running Metadata Subscribe integration tests..."
+ echo "============================================"
+
+ # Run tests with verbose output
+ go test -v -timeout=${{ env.TEST_TIMEOUT }} ./...
+
+ echo "============================================"
+ echo "Metadata Subscribe integration tests completed"
+
+ - name: Archive logs on failure
+ if: failure()
+ uses: actions/upload-artifact@v5
+ with:
+ name: metadata-subscribe-test-logs
+ path: |
+ /tmp/seaweedfs_*
+ retention-days: 7
+
+ - name: Test Summary
+ if: always()
+ run: |
+ echo "## Metadata Subscribe Integration Test Summary" >> $GITHUB_STEP_SUMMARY
+ echo "" >> $GITHUB_STEP_SUMMARY
+ echo "### Test Coverage" >> $GITHUB_STEP_SUMMARY
+ echo "- **Basic Subscription**: Subscribe to metadata changes and receive events" >> $GITHUB_STEP_SUMMARY
+ echo "- **Single-Filer No Stall**: Regression test for issue #4977" >> $GITHUB_STEP_SUMMARY
+ echo "- **Resume from Disk**: Verify subscription can resume from persisted logs" >> $GITHUB_STEP_SUMMARY
+ echo "" >> $GITHUB_STEP_SUMMARY
+ echo "### Related Issues" >> $GITHUB_STEP_SUMMARY
+ echo "- [#4977](https://github.com/seaweedfs/seaweedfs/issues/4977): filer.backup synchronisation stall" >> $GITHUB_STEP_SUMMARY
+
diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go
index e2acebd53..71f77683f 100644
--- a/test/erasure_coding/ec_integration_test.go
+++ b/test/erasure_coding/ec_integration_test.go
@@ -913,6 +913,7 @@ func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskClust
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
+ "-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
diff --git a/test/metadata_subscribe/Makefile b/test/metadata_subscribe/Makefile
new file mode 100644
index 000000000..93ee69202
--- /dev/null
+++ b/test/metadata_subscribe/Makefile
@@ -0,0 +1,27 @@
+.PHONY: test test-short build-weed clean
+
+# Build weed binary first if needed
+build-weed:
+ cd ../../weed && go build -o weed .
+
+# Run all integration tests
+test: build-weed
+ go test -v -timeout 5m .
+
+# Run tests in short mode (skip integration tests)
+test-short:
+ go test -v -short .
+
+# Run specific test
+test-basic: build-weed
+ go test -v -timeout 3m -run TestMetadataSubscribeBasic .
+
+test-stall: build-weed
+ go test -v -timeout 5m -run TestMetadataSubscribeSingleFilerNoStall .
+
+test-resume: build-weed
+ go test -v -timeout 3m -run TestMetadataSubscribeResumeFromDisk .
+
+clean:
+ rm -f ../../weed/weed
+
diff --git a/test/metadata_subscribe/README.md b/test/metadata_subscribe/README.md
new file mode 100644
index 000000000..48e900839
--- /dev/null
+++ b/test/metadata_subscribe/README.md
@@ -0,0 +1,49 @@
+# Metadata Subscribe Integration Tests
+
+This directory contains integration tests for the SeaweedFS metadata subscription functionality.
+
+## Tests
+
+### TestMetadataSubscribeBasic
+Tests basic metadata subscription functionality:
+- Start a SeaweedFS cluster (master, volume, filer)
+- Subscribe to metadata changes
+- Upload files and verify events are received
+
+### TestMetadataSubscribeSingleFilerNoStall
+Regression test for [issue #4977](https://github.com/seaweedfs/seaweedfs/issues/4977):
+- Tests that metadata subscription doesn't stall in single-filer setups
+- Simulates high-load file uploads while a subscriber tries to keep up
+- Verifies that events are received without significant stalling
+
+The bug was that in single-filer setups, `SubscribeMetadata` would block indefinitely
+on `MetaAggregator.MetaLogBuffer` which remains empty (no peers to aggregate from).
+The fix ensures that when the buffer is empty, the subscription returns to read from
+persisted logs on disk.
+
+### TestMetadataSubscribeResumeFromDisk
+Tests that subscription can resume from disk:
+- Upload files before starting subscription
+- Wait for logs to be flushed to disk
+- Start subscription from the beginning
+- Verify pre-uploaded files are received from disk
+
+## Running Tests
+
+```bash
+# Run all tests (requires weed binary in PATH or built)
+go test -v ./test/metadata_subscribe/...
+
+# Skip integration tests
+go test -short ./test/metadata_subscribe/...
+
+# Run with increased timeout for slow systems
+go test -v -timeout 5m ./test/metadata_subscribe/...
+```
+
+## Requirements
+
+- `weed` binary must be available in PATH or in the parent directories
+- Tests create temporary directories that are cleaned up after completion
+- Tests use ports 9333 (master), 8080 (volume), 8888 (filer)
+
diff --git a/test/metadata_subscribe/metadata_subscribe_integration_test.go b/test/metadata_subscribe/metadata_subscribe_integration_test.go
new file mode 100644
index 000000000..9d8b7c39b
--- /dev/null
+++ b/test/metadata_subscribe/metadata_subscribe_integration_test.go
@@ -0,0 +1,917 @@
+package metadata_subscribe
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+// TestMetadataSubscribeBasic tests basic metadata subscription functionality
+func TestMetadataSubscribeBasic(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_metadata_subscribe_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ // Wait for servers to be ready
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("SeaweedFS cluster started successfully")
+
+ t.Run("subscribe_and_receive_events", func(t *testing.T) {
+ // Create a channel to receive events
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100)
+ errChan := make(chan error, 1)
+
+ // Start subscribing in a goroutine
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context canceled") {
+ errChan <- err
+ }
+ }()
+
+ // Wait for subscription to be established
+ time.Sleep(2 * time.Second)
+
+ // Create test files via HTTP
+ testFiles := []string{
+ "/test/file1.txt",
+ "/test/file2.txt",
+ "/test/subdir/file3.txt",
+ }
+
+ for _, path := range testFiles {
+ err := uploadFile("http://127.0.0.1:8888"+path, []byte("test content for "+path))
+ require.NoError(t, err, "Failed to upload %s", path)
+ t.Logf("Uploaded %s", path)
+ }
+
+ // Collect events with timeout
+ receivedPaths := make(map[string]bool)
+ timeout := time.After(30 * time.Second)
+
+ eventLoop:
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ path := filepath.Join(event.Directory, event.EventNotification.NewEntry.Name)
+ t.Logf("Received event for: %s", path)
+ receivedPaths[path] = true
+ }
+ // Check if we received all expected events
+ allReceived := true
+ for _, p := range testFiles {
+ if !receivedPaths[p] {
+ allReceived = false
+ break
+ }
+ }
+ if allReceived {
+ break eventLoop
+ }
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ case <-timeout:
+ t.Logf("Timeout waiting for events. Received %d/%d events", len(receivedPaths), len(testFiles))
+ break eventLoop
+ }
+ }
+
+ // Verify we received events for all test files
+ for _, path := range testFiles {
+ assert.True(t, receivedPaths[path], "Should have received event for %s", path)
+ }
+ })
+}
+
+// TestMetadataSubscribeSingleFilerNoStall tests that subscription doesn't stall
+// in single-filer setups (regression test for issue #4977)
+func TestMetadataSubscribeSingleFilerNoStall(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_single_filer_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ // Wait for servers to be ready
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("Single-filer cluster started")
+
+ t.Run("high_load_subscription_no_stall", func(t *testing.T) {
+ // This test simulates the scenario from issue #4977:
+ // High-load writes while a subscriber tries to keep up
+
+ var receivedCount int64
+ var uploadedCount int64
+
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 1000)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ // Start subscriber
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context canceled") {
+ errChan <- err
+ }
+ }()
+
+ // Wait for subscription to be established
+ time.Sleep(2 * time.Second)
+
+ // Start counting received events
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ atomic.AddInt64(&receivedCount, 1)
+ }
+ }
+ case <-subCtx.Done():
+ return
+ }
+ }
+ }()
+
+ // Upload files concurrently (simulate high load)
+ numFiles := 100
+ numWorkers := 10
+
+ uploadWg := sync.WaitGroup{}
+ for w := 0; w < numWorkers; w++ {
+ uploadWg.Add(1)
+ go func(workerId int) {
+ defer uploadWg.Done()
+ for i := 0; i < numFiles/numWorkers; i++ {
+ path := fmt.Sprintf("/load_test/worker%d/file%d.txt", workerId, i)
+ err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d-%d", workerId, i)))
+ if err == nil {
+ atomic.AddInt64(&uploadedCount, 1)
+ }
+ }
+ }(w)
+ }
+
+ uploadWg.Wait()
+ uploaded := atomic.LoadInt64(&uploadedCount)
+ t.Logf("Uploaded %d files", uploaded)
+
+ // Wait for events to be received (with timeout to detect stall)
+ stallTimeout := time.After(60 * time.Second)
+ checkInterval := time.NewTicker(2 * time.Second)
+ defer checkInterval.Stop()
+
+ lastReceived := atomic.LoadInt64(&receivedCount)
+ staleCount := 0
+
+ waitLoop:
+ for {
+ select {
+ case <-stallTimeout:
+ received := atomic.LoadInt64(&receivedCount)
+ t.Logf("Timeout: received %d/%d events (%.1f%%)",
+ received, uploaded, float64(received)/float64(uploaded)*100)
+ break waitLoop
+ case <-checkInterval.C:
+ received := atomic.LoadInt64(&receivedCount)
+ if received >= uploaded {
+ t.Logf("All %d events received", received)
+ break waitLoop
+ }
+ if received == lastReceived {
+ staleCount++
+ if staleCount >= 5 {
+ // If no progress for 10 seconds, subscription may be stalled
+ t.Logf("WARNING: No progress for %d checks. Received %d/%d (%.1f%%)",
+ staleCount, received, uploaded, float64(received)/float64(uploaded)*100)
+ }
+ } else {
+ staleCount = 0
+ t.Logf("Progress: received %d/%d events (%.1f%%)",
+ received, uploaded, float64(received)/float64(uploaded)*100)
+ }
+ lastReceived = received
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ }
+ }
+
+ subCancel()
+ wg.Wait()
+
+ received := atomic.LoadInt64(&receivedCount)
+
+ // With the fix for #4977, we should receive a high percentage of events
+ // Before the fix, this would stall at ~20-40%
+ percentage := float64(received) / float64(uploaded) * 100
+ t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage)
+
+ // We should receive at least 80% of events (allowing for some timing issues)
+ assert.GreaterOrEqual(t, percentage, 80.0,
+ "Should receive at least 80%% of events (received %.1f%%)", percentage)
+ })
+}
+
+// TestMetadataSubscribeResumeFromDisk tests that subscription can resume from disk
+func TestMetadataSubscribeResumeFromDisk(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_resume_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Run("upload_before_subscribe", func(t *testing.T) {
+ // Upload files BEFORE starting subscription
+ numFiles := 20
+ for i := 0; i < numFiles; i++ {
+ path := fmt.Sprintf("/pre_subscribe/file%d.txt", i)
+ err := uploadFile("http://127.0.0.1:8888"+path, []byte(fmt.Sprintf("content %d", i)))
+ require.NoError(t, err)
+ }
+ t.Logf("Uploaded %d files before subscription", numFiles)
+
+ // Wait for logs to be flushed to disk
+ time.Sleep(15 * time.Second)
+
+ // Now start subscription from the beginning
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithTimeout(ctx, 30*time.Second)
+ defer subCancel()
+
+ go func() {
+ err := subscribeToMetadataFromBeginning(subCtx, "127.0.0.1:8888", "/pre_subscribe/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context") {
+ errChan <- err
+ }
+ }()
+
+ // Count received events
+ receivedCount := 0
+ timeout := time.After(30 * time.Second)
+
+ countLoop:
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ receivedCount++
+ t.Logf("Received event %d: %s/%s", receivedCount,
+ event.Directory, event.EventNotification.NewEntry.Name)
+ }
+ }
+ if receivedCount >= numFiles {
+ break countLoop
+ }
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ case <-timeout:
+ t.Logf("Timeout: received %d/%d events", receivedCount, numFiles)
+ break countLoop
+ }
+ }
+
+ // Should receive all pre-uploaded files from disk
+ assert.GreaterOrEqual(t, receivedCount, numFiles-2, // Allow small margin
+ "Should receive most pre-uploaded files from disk (received %d/%d)", receivedCount, numFiles)
+ })
+}
+
+// TestMetadataSubscribeConcurrentWrites tests subscription with concurrent writes
+func TestMetadataSubscribeConcurrentWrites(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_concurrent_writes_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("Cluster started for concurrent writes test")
+
+ t.Run("concurrent_goroutine_writes", func(t *testing.T) {
+ var receivedCount int64
+ var uploadedCount int64
+
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 10000)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ // Start subscriber
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/concurrent/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context") {
+ errChan <- err
+ }
+ }()
+
+ time.Sleep(2 * time.Second)
+
+ // Start counting received events
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ atomic.AddInt64(&receivedCount, 1)
+ }
+ }
+ case <-subCtx.Done():
+ return
+ }
+ }
+ }()
+
+ // Launch many concurrent writers
+ numWorkers := 50
+ filesPerWorker := 20
+ totalExpected := int64(numWorkers * filesPerWorker)
+
+ uploadWg := sync.WaitGroup{}
+ for w := 0; w < numWorkers; w++ {
+ uploadWg.Add(1)
+ go func(workerId int) {
+ defer uploadWg.Done()
+ for i := 0; i < filesPerWorker; i++ {
+ path := fmt.Sprintf("/concurrent/w%d/f%d.txt", workerId, i)
+ content := []byte(fmt.Sprintf("worker%d-file%d", workerId, i))
+ if err := uploadFile("http://127.0.0.1:8888"+path, content); err == nil {
+ atomic.AddInt64(&uploadedCount, 1)
+ }
+ }
+ }(w)
+ }
+
+ uploadWg.Wait()
+ uploaded := atomic.LoadInt64(&uploadedCount)
+ t.Logf("Uploaded %d/%d files from %d concurrent workers", uploaded, totalExpected, numWorkers)
+
+ // Wait for events with progress tracking
+ stallTimeout := time.After(90 * time.Second)
+ checkInterval := time.NewTicker(3 * time.Second)
+ defer checkInterval.Stop()
+
+ lastReceived := int64(0)
+ stableCount := 0
+
+ waitLoop:
+ for {
+ select {
+ case <-stallTimeout:
+ break waitLoop
+ case <-checkInterval.C:
+ received := atomic.LoadInt64(&receivedCount)
+ if received >= uploaded {
+ t.Logf("All %d events received", received)
+ break waitLoop
+ }
+ if received == lastReceived {
+ stableCount++
+ if stableCount >= 5 {
+ t.Logf("No progress for %d checks, received %d/%d", stableCount, received, uploaded)
+ break waitLoop
+ }
+ } else {
+ stableCount = 0
+ t.Logf("Progress: %d/%d (%.1f%%)", received, uploaded, float64(received)/float64(uploaded)*100)
+ }
+ lastReceived = received
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ }
+ }
+
+ subCancel()
+ wg.Wait()
+
+ received := atomic.LoadInt64(&receivedCount)
+ percentage := float64(received) / float64(uploaded) * 100
+ t.Logf("Final: received %d/%d events (%.1f%%)", received, uploaded, percentage)
+
+ // Should receive at least 80% of events
+ assert.GreaterOrEqual(t, percentage, 80.0,
+ "Should receive at least 80%% of concurrent write events")
+ })
+}
+
+// TestMetadataSubscribeMillionUpdates tests subscription with 1 million metadata updates
+// This test creates metadata entries directly via gRPC without actual file content
+func TestMetadataSubscribeMillionUpdates(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_million_updates_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
+ defer cancel()
+
+ cluster, err := startSeaweedFSCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:9333", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8080", 30*time.Second))
+ require.NoError(t, waitForHTTPServer("http://127.0.0.1:8888", 30*time.Second))
+
+ t.Logf("Cluster started for million updates test")
+
+ t.Run("million_metadata_updates", func(t *testing.T) {
+ var receivedCount int64
+ var createdCount int64
+ totalEntries := int64(1_000_000)
+
+ eventsChan := make(chan *filer_pb.SubscribeMetadataResponse, 100000)
+ errChan := make(chan error, 1)
+
+ subCtx, subCancel := context.WithCancel(ctx)
+ defer subCancel()
+
+ // Start subscriber
+ go func() {
+ err := subscribeToMetadata(subCtx, "127.0.0.1:8888", "/million/", eventsChan)
+ if err != nil && !strings.Contains(err.Error(), "context") {
+ errChan <- err
+ }
+ }()
+
+ time.Sleep(2 * time.Second)
+
+ // Start counting received events
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case event := <-eventsChan:
+ if event.EventNotification != nil && event.EventNotification.NewEntry != nil {
+ if !event.EventNotification.NewEntry.IsDirectory {
+ atomic.AddInt64(&receivedCount, 1)
+ }
+ }
+ case <-subCtx.Done():
+ return
+ }
+ }
+ }()
+
+ // Create metadata entries directly via gRPC (no actual file content)
+ numWorkers := 100
+ entriesPerWorker := int(totalEntries) / numWorkers
+
+ startTime := time.Now()
+ createWg := sync.WaitGroup{}
+
+ for w := 0; w < numWorkers; w++ {
+ createWg.Add(1)
+ go func(workerId int) {
+ defer createWg.Done()
+ grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
+
+ err := pb.WithFilerClient(false, 0, pb.ServerAddress("127.0.0.1:8888"), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ for i := 0; i < entriesPerWorker; i++ {
+ dir := fmt.Sprintf("/million/bucket%d", workerId%100)
+ name := fmt.Sprintf("entry_%d_%d", workerId, i)
+
+ _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ FileSize: 100,
+ Mtime: time.Now().Unix(),
+ FileMode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ },
+ },
+ })
+ if err == nil {
+ atomic.AddInt64(&createdCount, 1)
+ }
+
+ // Log progress every 10000 entries per worker
+ if i > 0 && i%10000 == 0 {
+ created := atomic.LoadInt64(&createdCount)
+ elapsed := time.Since(startTime)
+ rate := float64(created) / elapsed.Seconds()
+ t.Logf("Worker %d: created %d entries, total %d (%.0f/sec)",
+ workerId, i, created, rate)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ t.Logf("Worker %d error: %v", workerId, err)
+ }
+ }(w)
+ }
+
+ // Progress reporter
+ progressDone := make(chan struct{})
+ go func() {
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ created := atomic.LoadInt64(&createdCount)
+ received := atomic.LoadInt64(&receivedCount)
+ elapsed := time.Since(startTime)
+ createRate := float64(created) / elapsed.Seconds()
+ receiveRate := float64(received) / elapsed.Seconds()
+ t.Logf("Progress: created %d (%.0f/sec), received %d (%.0f/sec), lag %d",
+ created, createRate, received, receiveRate, created-received)
+ case <-progressDone:
+ return
+ }
+ }
+ }()
+
+ createWg.Wait()
+ close(progressDone)
+
+ created := atomic.LoadInt64(&createdCount)
+ elapsed := time.Since(startTime)
+ t.Logf("Created %d entries in %v (%.0f/sec)", created, elapsed, float64(created)/elapsed.Seconds())
+
+ // Wait for subscription to catch up
+ catchupTimeout := time.After(5 * time.Minute)
+ checkInterval := time.NewTicker(5 * time.Second)
+ defer checkInterval.Stop()
+
+ lastReceived := int64(0)
+ stableCount := 0
+
+ waitLoop:
+ for {
+ select {
+ case <-catchupTimeout:
+ t.Logf("Catchup timeout reached")
+ break waitLoop
+ case <-checkInterval.C:
+ received := atomic.LoadInt64(&receivedCount)
+ if received >= created {
+ t.Logf("All %d events received", received)
+ break waitLoop
+ }
+ if received == lastReceived {
+ stableCount++
+ if stableCount >= 10 {
+ t.Logf("No progress for %d checks", stableCount)
+ break waitLoop
+ }
+ } else {
+ stableCount = 0
+ rate := float64(received-lastReceived) / 5.0
+ t.Logf("Catching up: %d/%d (%.1f%%) at %.0f/sec",
+ received, created, float64(received)/float64(created)*100, rate)
+ }
+ lastReceived = received
+ case err := <-errChan:
+ t.Fatalf("Subscription error: %v", err)
+ }
+ }
+
+ subCancel()
+ wg.Wait()
+
+ received := atomic.LoadInt64(&receivedCount)
+ percentage := float64(received) / float64(created) * 100
+ totalTime := time.Since(startTime)
+ t.Logf("Final: created %d, received %d (%.1f%%) in %v", created, received, percentage, totalTime)
+
+ // For million entries, we expect at least 90% to be received
+ assert.GreaterOrEqual(t, percentage, 90.0,
+ "Should receive at least 90%% of million metadata events (received %.1f%%)", percentage)
+ })
+}
+
+// Helper types and functions
+
+type TestCluster struct {
+ masterCmd *exec.Cmd
+ volumeCmd *exec.Cmd
+ filerCmd *exec.Cmd
+ testDir string
+}
+
+func (c *TestCluster) Stop() {
+ if c.filerCmd != nil && c.filerCmd.Process != nil {
+ c.filerCmd.Process.Kill()
+ c.filerCmd.Wait()
+ }
+ if c.volumeCmd != nil && c.volumeCmd.Process != nil {
+ c.volumeCmd.Process.Kill()
+ c.volumeCmd.Wait()
+ }
+ if c.masterCmd != nil && c.masterCmd.Process != nil {
+ c.masterCmd.Process.Kill()
+ c.masterCmd.Wait()
+ }
+}
+
+func startSeaweedFSCluster(ctx context.Context, dataDir string) (*TestCluster, error) {
+ weedBinary := findWeedBinary()
+ if weedBinary == "" {
+ return nil, fmt.Errorf("weed binary not found")
+ }
+
+ cluster := &TestCluster{testDir: dataDir}
+
+ // Create directories
+ masterDir := filepath.Join(dataDir, "master")
+ volumeDir := filepath.Join(dataDir, "volume")
+ filerDir := filepath.Join(dataDir, "filer")
+ if err := os.MkdirAll(masterDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create master dir: %v", err)
+ }
+ if err := os.MkdirAll(volumeDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create volume dir: %v", err)
+ }
+ if err := os.MkdirAll(filerDir, 0755); err != nil {
+ return nil, fmt.Errorf("failed to create filer dir: %v", err)
+ }
+
+ // Start master server
+ masterCmd := exec.CommandContext(ctx, weedBinary, "master",
+ "-port", "9333",
+ "-mdir", masterDir,
+ "-volumeSizeLimitMB", "10",
+ "-ip", "127.0.0.1",
+ "-peers", "none",
+ )
+ masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create master log file: %v", err)
+ }
+ masterCmd.Stdout = masterLogFile
+ masterCmd.Stderr = masterLogFile
+ if err := masterCmd.Start(); err != nil {
+ return nil, fmt.Errorf("failed to start master: %v", err)
+ }
+ cluster.masterCmd = masterCmd
+
+ time.Sleep(2 * time.Second)
+
+ // Start volume server
+ volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
+ "-port", "8080",
+ "-dir", volumeDir,
+ "-max", "10",
+ "-master", "127.0.0.1:9333",
+ "-ip", "127.0.0.1",
+ )
+ volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log"))
+ if err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create volume log file: %v", err)
+ }
+ volumeCmd.Stdout = volumeLogFile
+ volumeCmd.Stderr = volumeLogFile
+ if err := volumeCmd.Start(); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to start volume: %v", err)
+ }
+ cluster.volumeCmd = volumeCmd
+
+ time.Sleep(2 * time.Second)
+
+ // Start filer server
+ filerCmd := exec.CommandContext(ctx, weedBinary, "filer",
+ "-port", "8888",
+ "-master", "127.0.0.1:9333",
+ "-ip", "127.0.0.1",
+ )
+ filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log"))
+ if err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create filer log file: %v", err)
+ }
+ filerCmd.Stdout = filerLogFile
+ filerCmd.Stderr = filerLogFile
+ if err := filerCmd.Start(); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to start filer: %v", err)
+ }
+ cluster.filerCmd = filerCmd
+
+ time.Sleep(3 * time.Second)
+
+ return cluster, nil
+}
+
+func findWeedBinary() string {
+ candidates := []string{
+ "../../../weed/weed",
+ "../../weed/weed",
+ "./weed",
+ "weed",
+ }
+ for _, candidate := range candidates {
+ if _, err := os.Stat(candidate); err == nil {
+ return candidate
+ }
+ }
+ if path, err := exec.LookPath("weed"); err == nil {
+ return path
+ }
+ return ""
+}
+
+func waitForHTTPServer(url string, timeout time.Duration) error {
+ start := time.Now()
+ for time.Since(start) < timeout {
+ resp, err := http.Get(url)
+ if err == nil {
+ resp.Body.Close()
+ return nil
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ return fmt.Errorf("timeout waiting for server %s", url)
+}
+
+func uploadFile(url string, content []byte) error {
+ // Create multipart form data
+ var buf bytes.Buffer
+ writer := multipart.NewWriter(&buf)
+
+ // Extract filename from URL path
+ parts := strings.Split(url, "/")
+ filename := parts[len(parts)-1]
+ if filename == "" {
+ filename = "file.txt"
+ }
+
+ // Create form file field
+ part, err := writer.CreateFormFile("file", filename)
+ if err != nil {
+ return fmt.Errorf("create form file: %v", err)
+ }
+ if _, err := part.Write(content); err != nil {
+ return fmt.Errorf("write content: %v", err)
+ }
+ if err := writer.Close(); err != nil {
+ return fmt.Errorf("close writer: %v", err)
+ }
+
+ req, err := http.NewRequest("POST", url, &buf)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", writer.FormDataContentType())
+
+ client := &http.Client{Timeout: 10 * time.Second}
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode >= 400 {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
+ }
+ return nil
+}
+
+func subscribeToMetadata(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
+ return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, time.Now().UnixNano(), eventsChan)
+}
+
+func subscribeToMetadataFromBeginning(ctx context.Context, filerGrpcAddress, pathPrefix string, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
+ // Start from Unix epoch to get all events
+ return subscribeToMetadataWithOptions(ctx, filerGrpcAddress, pathPrefix, 0, eventsChan)
+}
+
+func subscribeToMetadataWithOptions(ctx context.Context, filerGrpcAddress, pathPrefix string, sinceNs int64, eventsChan chan<- *filer_pb.SubscribeMetadataResponse) error {
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ if grpcDialOption == nil {
+ grpcDialOption = grpc.WithTransportCredentials(insecure.NewCredentials())
+ }
+
+ return pb.WithFilerClient(false, 0, pb.ServerAddress(filerGrpcAddress), grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "integration_test",
+ PathPrefix: pathPrefix,
+ SinceNs: sinceNs,
+ ClientId: util.RandomInt32(),
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF || ctx.Err() != nil {
+ return nil
+ }
+ return err
+ }
+
+ select {
+ case eventsChan <- resp:
+ case <-ctx.Done():
+ return nil
+ case <-time.After(100 * time.Millisecond):
+ // Channel full after brief wait, log warning
+ glog.Warningf("Event channel full, skipping event for %s", resp.Directory)
+ }
+ }
+ })
+}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 22e69cc60..853cbe475 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -723,7 +723,10 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
}
if tsMemory.IsZero() { // case 2.2
- return nil, -2, nil
+ // Buffer is empty - return ResumeFromDiskError so caller can read from disk
+ // This fixes issue #4977 where SubscribeMetadata stalls because
+ // MetaAggregator.MetaLogBuffer is empty in single-filer setups
+ return nil, -2, ResumeFromDiskError
} else if lastReadPosition.Time.Before(tsMemory) { // case 2.3
// For time-based reads, only check timestamp for disk reads
// Don't use offset comparisons as they're not meaningful for time-based subscriptions