aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read_test.go
blob: 802dcdacf57a7f108452c7b57f233cf1fff76f1e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package log_buffer

import (
	"context"
	"sync"
	"testing"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)

// TestLoopProcessLogDataWithOffset_ClientDisconnect tests that the loop exits
// when the client disconnects (waitForDataFn returns false)
func TestLoopProcessLogDataWithOffset_ClientDisconnect(t *testing.T) {
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
	defer logBuffer.ShutdownLogBuffer()

	// Simulate client disconnect after 100ms
	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer cancel()

	waitForDataFn := func() bool {
		select {
		case <-ctx.Done():
			return false // Client disconnected
		default:
			return true
		}
	}

	eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
		return true, nil
	}

	startPosition := NewMessagePositionFromOffset(0)
	startTime := time.Now()

	// This should exit within 200ms (100ms timeout + some buffer)
	_, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)

	elapsed := time.Since(startTime)

	if !isDone {
		t.Errorf("Expected isDone=true when client disconnects, got false")
	}

	if elapsed > 500*time.Millisecond {
		t.Errorf("Loop took too long to exit: %v (expected < 500ms)", elapsed)
	}

	t.Logf("Loop exited cleanly in %v after client disconnect", elapsed)
}

// TestLoopProcessLogDataWithOffset_EmptyBuffer tests that the loop doesn't
// busy-wait when the buffer is empty
func TestLoopProcessLogDataWithOffset_EmptyBuffer(t *testing.T) {
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
	defer logBuffer.ShutdownLogBuffer()

	callCount := 0
	maxCalls := 10
	mu := sync.Mutex{}

	waitForDataFn := func() bool {
		mu.Lock()
		defer mu.Unlock()
		callCount++
		// Disconnect after maxCalls to prevent infinite loop
		return callCount < maxCalls
	}

	eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
		return true, nil
	}

	startPosition := NewMessagePositionFromOffset(0)
	startTime := time.Now()

	_, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)

	elapsed := time.Since(startTime)

	if !isDone {
		t.Errorf("Expected isDone=true when waitForDataFn returns false, got false")
	}

	// With 10ms sleep per iteration, 10 iterations should take ~100ms minimum
	minExpectedTime := time.Duration(maxCalls-1) * 10 * time.Millisecond
	if elapsed < minExpectedTime {
		t.Errorf("Loop exited too quickly (%v), expected at least %v (suggests busy-waiting)", elapsed, minExpectedTime)
	}

	// But shouldn't take more than 2x expected (allows for some overhead)
	maxExpectedTime := time.Duration(maxCalls) * 30 * time.Millisecond
	if elapsed > maxExpectedTime {
		t.Errorf("Loop took too long: %v (expected < %v)", elapsed, maxExpectedTime)
	}

	mu.Lock()
	finalCallCount := callCount
	mu.Unlock()

	if finalCallCount != maxCalls {
		t.Errorf("Expected exactly %d calls to waitForDataFn, got %d", maxCalls, finalCallCount)
	}

	t.Logf("Loop exited cleanly in %v after %d iterations (no busy-waiting detected)", elapsed, finalCallCount)
}

// TestLoopProcessLogDataWithOffset_NoDataResumeFromDisk tests that the loop
// properly handles ResumeFromDiskError without busy-waiting
func TestLoopProcessLogDataWithOffset_NoDataResumeFromDisk(t *testing.T) {
	readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
		// No data on disk
		return startPosition, false, nil
	}
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil)
	defer logBuffer.ShutdownLogBuffer()

	callCount := 0
	maxCalls := 5
	mu := sync.Mutex{}

	waitForDataFn := func() bool {
		mu.Lock()
		defer mu.Unlock()
		callCount++
		// Disconnect after maxCalls
		return callCount < maxCalls
	}

	eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
		return true, nil
	}

	startPosition := NewMessagePositionFromOffset(0)
	startTime := time.Now()

	_, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)

	elapsed := time.Since(startTime)

	if !isDone {
		t.Errorf("Expected isDone=true when waitForDataFn returns false, got false")
	}

	// Should take at least (maxCalls-1) * 10ms due to sleep in ResumeFromDiskError path
	minExpectedTime := time.Duration(maxCalls-1) * 10 * time.Millisecond
	if elapsed < minExpectedTime {
		t.Errorf("Loop exited too quickly (%v), expected at least %v (suggests missing sleep)", elapsed, minExpectedTime)
	}

	t.Logf("Loop exited cleanly in %v after %d iterations (proper sleep detected)", elapsed, callCount)
}

// TestLoopProcessLogDataWithOffset_WithData tests normal operation with data
func TestLoopProcessLogDataWithOffset_WithData(t *testing.T) {
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
	defer logBuffer.ShutdownLogBuffer()

	// Add some test data to the buffer
	testMessages := []*mq_pb.DataMessage{
		{Key: []byte("key1"), Value: []byte("message1"), TsNs: 1},
		{Key: []byte("key2"), Value: []byte("message2"), TsNs: 2},
		{Key: []byte("key3"), Value: []byte("message3"), TsNs: 3},
	}

	for _, msg := range testMessages {
		if err := logBuffer.AddToBuffer(msg); err != nil {
			t.Fatalf("Failed to add message to buffer: %v", err)
		}
	}

	receivedCount := 0
	mu := sync.Mutex{}

	// Disconnect after receiving at least 1 message to test that data processing works
	waitForDataFn := func() bool {
		mu.Lock()
		defer mu.Unlock()
		return receivedCount == 0 // Disconnect after first message
	}

	eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
		mu.Lock()
		receivedCount++
		mu.Unlock()
		return true, nil // Continue processing
	}

	startPosition := NewMessagePositionFromOffset(0)
	startTime := time.Now()

	_, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)

	elapsed := time.Since(startTime)

	if !isDone {
		t.Errorf("Expected isDone=true after client disconnect, got false")
	}

	mu.Lock()
	finalCount := receivedCount
	mu.Unlock()

	if finalCount < 1 {
		t.Errorf("Expected to receive at least 1 message, got %d", finalCount)
	}

	// Should complete quickly since data is available
	if elapsed > 1*time.Second {
		t.Errorf("Processing took too long: %v (expected < 1s)", elapsed)
	}

	t.Logf("Successfully processed %d message(s) in %v", finalCount, elapsed)
}

// TestLoopProcessLogDataWithOffset_ConcurrentDisconnect tests that the loop
// handles concurrent client disconnects without panicking
func TestLoopProcessLogDataWithOffset_ConcurrentDisconnect(t *testing.T) {
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
	defer logBuffer.ShutdownLogBuffer()

	numClients := 10
	var wg sync.WaitGroup

	for i := 0; i < numClients; i++ {
		wg.Add(1)
		go func(clientID int) {
			defer wg.Done()

			ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
			defer cancel()

			waitForDataFn := func() bool {
				select {
				case <-ctx.Done():
					return false
				default:
					return true
				}
			}

			eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
				return true, nil
			}

			startPosition := NewMessagePositionFromOffset(0)
			_, _, _ = logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
		}(i)
	}

	// Wait for all clients to finish with a timeout
	done := make(chan struct{})
	go func() {
		wg.Wait()
		close(done)
	}()

	select {
	case <-done:
		t.Logf("All %d concurrent clients exited cleanly", numClients)
	case <-time.After(5 * time.Second):
		t.Errorf("Timeout waiting for concurrent clients to exit (possible deadlock or stuck loop)")
	}
}

// TestLoopProcessLogDataWithOffset_StopTime tests that the loop respects stopTsNs
func TestLoopProcessLogDataWithOffset_StopTime(t *testing.T) {
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
	defer logBuffer.ShutdownLogBuffer()

	callCount := 0
	waitForDataFn := func() bool {
		callCount++
		// Prevent infinite loop in case of test failure
		return callCount < 10
	}

	eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
		t.Errorf("Should not process any entries when stopTsNs is in the past")
		return false, nil
	}

	startPosition := NewMessagePositionFromOffset(0)
	stopTsNs := time.Now().Add(-1 * time.Hour).UnixNano() // Stop time in the past

	startTime := time.Now()
	_, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, stopTsNs, waitForDataFn, eachLogEntryFn)
	elapsed := time.Since(startTime)

	if !isDone {
		t.Errorf("Expected isDone=true when stopTsNs is in the past, got false")
	}

	if elapsed > 1*time.Second {
		t.Errorf("Loop should exit quickly when stopTsNs is in the past, took %v", elapsed)
	}

	t.Logf("Loop correctly exited for past stopTsNs in %v (waitForDataFn called %d times)", elapsed, callCount)
}

// BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer benchmarks the performance
// of the loop with an empty buffer to ensure no busy-waiting
func BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer(b *testing.B) {
	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
	defer logBuffer.ShutdownLogBuffer()

	for i := 0; i < b.N; i++ {
		callCount := 0
		waitForDataFn := func() bool {
			callCount++
			return callCount < 3 // Exit after 3 calls
		}

		eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
			return true, nil
		}

		startPosition := NewMessagePositionFromOffset(0)
		logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
	}
}