1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
package integration
import (
"testing"
"time"
)
// TestAdaptiveFetchTimeout verifies that the adaptive timeout strategy
// allows reading multiple records from disk within a reasonable time
func TestAdaptiveFetchTimeout(t *testing.T) {
t.Log("Testing adaptive fetch timeout strategy...")
// Simulate the scenario where we need to read 4 records from disk
// Each record takes 100-200ms to read (simulates disk I/O)
recordReadTimes := []time.Duration{
150 * time.Millisecond, // Record 1 (from disk)
150 * time.Millisecond, // Record 2 (from disk)
150 * time.Millisecond, // Record 3 (from disk)
150 * time.Millisecond, // Record 4 (from disk)
}
// Test 1: Old strategy (50ms timeout per record)
t.Run("OldStrategy_50ms_Timeout", func(t *testing.T) {
timeout := 50 * time.Millisecond
recordsReceived := 0
start := time.Now()
for i, readTime := range recordReadTimes {
if readTime <= timeout {
recordsReceived++
} else {
t.Logf("Record %d timed out (readTime=%v > timeout=%v)", i+1, readTime, timeout)
break
}
}
duration := time.Since(start)
t.Logf("Old strategy: received %d/%d records in %v", recordsReceived, len(recordReadTimes), duration)
if recordsReceived >= len(recordReadTimes) {
t.Error("Old strategy should NOT receive all records (timeout too short)")
} else {
t.Logf("✓ Bug reproduced: old strategy times out too quickly")
}
})
// Test 2: New adaptive strategy (1 second timeout for first 5 records)
t.Run("NewStrategy_1s_Timeout", func(t *testing.T) {
timeout := 1 * time.Second // Generous timeout for first batch
recordsReceived := 0
start := time.Now()
for i, readTime := range recordReadTimes {
if readTime <= timeout {
recordsReceived++
t.Logf("Record %d received (readTime=%v)", i+1, readTime)
} else {
t.Logf("Record %d timed out (readTime=%v > timeout=%v)", i+1, readTime, timeout)
break
}
}
duration := time.Since(start)
t.Logf("New strategy: received %d/%d records in %v", recordsReceived, len(recordReadTimes), duration)
if recordsReceived < len(recordReadTimes) {
t.Errorf("New strategy should receive all records (timeout=%v)", timeout)
} else {
t.Logf("✓ Fix verified: new strategy receives all records")
}
})
// Test 3: Schema Registry catch-up scenario
t.Run("SchemaRegistry_CatchUp_Scenario", func(t *testing.T) {
// Schema Registry has 500ms total timeout to catch up from offset 3 to 6
schemaRegistryTimeout := 500 * time.Millisecond
// With old strategy (50ms per record after first):
// - First record: 10s timeout ✓
// - Records 2-4: 50ms each ✗ (times out after record 1)
// Total time: > 500ms (only gets 1 record per fetch)
// With new strategy (1s per record for first 5):
// - Records 1-4: 1s each ✓
// - All 4 records received in ~600ms
// Total time: ~600ms (gets all 4 records in one fetch)
recordsNeeded := 4
perRecordReadTime := 150 * time.Millisecond
// Old strategy simulation
oldStrategyTime := time.Duration(recordsNeeded) * 50 * time.Millisecond // Times out, need multiple fetches
oldStrategyRoundTrips := recordsNeeded // One record per fetch
// New strategy simulation
newStrategyTime := time.Duration(recordsNeeded) * perRecordReadTime // All in one fetch
newStrategyRoundTrips := 1
t.Logf("Schema Registry catch-up simulation:")
t.Logf(" Old strategy: %d round trips, ~%v total time", oldStrategyRoundTrips, oldStrategyTime*time.Duration(oldStrategyRoundTrips))
t.Logf(" New strategy: %d round trip, ~%v total time", newStrategyRoundTrips, newStrategyTime)
t.Logf(" Schema Registry timeout: %v", schemaRegistryTimeout)
oldStrategyTotalTime := oldStrategyTime * time.Duration(oldStrategyRoundTrips)
newStrategyTotalTime := newStrategyTime * time.Duration(newStrategyRoundTrips)
if oldStrategyTotalTime > schemaRegistryTimeout {
t.Logf("✓ Old strategy exceeds timeout: %v > %v", oldStrategyTotalTime, schemaRegistryTimeout)
}
if newStrategyTotalTime <= schemaRegistryTimeout+200*time.Millisecond {
t.Logf("✓ New strategy completes within timeout: %v <= %v", newStrategyTotalTime, schemaRegistryTimeout+200*time.Millisecond)
} else {
t.Errorf("New strategy too slow: %v > %v", newStrategyTotalTime, schemaRegistryTimeout)
}
})
}
// TestFetchTimeoutProgression verifies the timeout progression logic
func TestFetchTimeoutProgression(t *testing.T) {
t.Log("Testing fetch timeout progression...")
// Adaptive timeout logic:
// - First 5 records: 1 second (catch-up from disk)
// - After 5 records: 100ms (streaming from memory)
getTimeout := func(recordNumber int) time.Duration {
if recordNumber <= 5 {
return 1 * time.Second
}
return 100 * time.Millisecond
}
t.Logf("Timeout progression:")
for i := 1; i <= 10; i++ {
timeout := getTimeout(i)
t.Logf(" Record %2d: timeout = %v", i, timeout)
}
// Verify the progression
if getTimeout(1) != 1*time.Second {
t.Error("First record should have 1s timeout")
}
if getTimeout(5) != 1*time.Second {
t.Error("Fifth record should have 1s timeout")
}
if getTimeout(6) != 100*time.Millisecond {
t.Error("Sixth record should have 100ms timeout (fast path)")
}
if getTimeout(10) != 100*time.Millisecond {
t.Error("Tenth record should have 100ms timeout (fast path)")
}
t.Log("✓ Timeout progression is correct")
}
|