1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
package log_buffer
import (
"errors"
"testing"
"time"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// TestReadTsCorruptedBuffer tests that readTs properly returns an error for corrupted data
func TestReadTsCorruptedBuffer(t *testing.T) {
// Create a corrupted buffer with invalid protobuf data
buf := make([]byte, 100)
// Set size field to 10 bytes (using proper encoding)
util.Uint32toBytes(buf[0:4], 10)
// Fill with garbage data that won't unmarshal as LogEntry
for i := 4; i < 14; i++ {
buf[i] = 0xFF
}
// Attempt to read timestamp
size, ts, err := readTs(buf, 0)
// Should return an error
if err == nil {
t.Error("Expected error for corrupted buffer, got nil")
}
// Size and ts should be zero on error
if size != 0 {
t.Errorf("Expected size=0 on error, got %d", size)
}
if ts != 0 {
t.Errorf("Expected ts=0 on error, got %d", ts)
}
// Error should indicate corruption
if !errors.Is(err, ErrBufferCorrupted) {
t.Logf("Error message: %v", err)
// Check if error message contains expected text
if err.Error() == "" || len(err.Error()) == 0 {
t.Error("Expected non-empty error message")
}
}
t.Logf("✓ readTs correctly returned error for corrupted buffer: %v", err)
}
// TestReadTsValidBuffer tests that readTs works correctly for valid data
func TestReadTsValidBuffer(t *testing.T) {
// Create a valid LogEntry
logEntry := &filer_pb.LogEntry{
TsNs: 123456789,
Key: []byte("test-key"),
}
// Marshal it
data, err := proto.Marshal(logEntry)
if err != nil {
t.Fatalf("Failed to marshal LogEntry: %v", err)
}
// Create buffer with size prefix using util function
buf := make([]byte, 4+len(data))
util.Uint32toBytes(buf[0:4], uint32(len(data)))
copy(buf[4:], data)
// Read timestamp
size, ts, err := readTs(buf, 0)
// Should succeed
if err != nil {
t.Fatalf("Expected no error for valid buffer, got: %v", err)
}
// Should return correct values
if size != len(data) {
t.Errorf("Expected size=%d, got %d", len(data), size)
}
if ts != logEntry.TsNs {
t.Errorf("Expected ts=%d, got %d", logEntry.TsNs, ts)
}
t.Logf("✓ readTs correctly parsed valid buffer: size=%d, ts=%d", size, ts)
}
// TestReadFromBufferCorruption tests that ReadFromBuffer propagates corruption errors
func TestReadFromBufferCorruption(t *testing.T) {
lb := NewLogBuffer("test-corruption", time.Second, nil, nil, func() {})
// Add a valid entry first using AddDataToBuffer
validKey := []byte("valid")
validData, _ := proto.Marshal(&filer_pb.LogEntry{
TsNs: 1000,
Key: validKey,
})
if err := lb.AddDataToBuffer(validKey, validData, 1000); err != nil {
t.Fatalf("Failed to add data to buffer: %v", err)
}
// Manually corrupt the buffer by writing garbage
// This simulates a corruption scenario
if len(lb.idx) > 0 {
pos := lb.idx[0]
// Overwrite the protobuf data with garbage
for i := pos + 4; i < pos+8 && i < len(lb.buf); i++ {
lb.buf[i] = 0xFF
}
}
// Try to read - should detect corruption
startPos := MessagePosition{Time: lb.startTime}
buf, offset, err := lb.ReadFromBuffer(startPos)
// Should return corruption error
if err == nil {
t.Error("Expected corruption error, got nil")
if buf != nil {
t.Logf("Unexpected success: got buffer with %d bytes", buf.Len())
}
} else {
// Verify it's a corruption error
if !errors.Is(err, ErrBufferCorrupted) {
t.Logf("Got error (not ErrBufferCorrupted sentinel, but still an error): %v", err)
}
t.Logf("✓ ReadFromBuffer correctly detected corruption: %v", err)
}
t.Logf("ReadFromBuffer result: buf=%v, offset=%d, err=%v", buf != nil, offset, err)
}
// TestLocateByTsCorruption tests that locateByTs propagates corruption errors
func TestLocateByTsCorruption(t *testing.T) {
// Create a MemBuffer with corrupted data
mb := &MemBuffer{
buf: make([]byte, 100),
size: 14,
}
// Set size field (using proper encoding)
util.Uint32toBytes(mb.buf[0:4], 10)
// Fill with garbage
for i := 4; i < 14; i++ {
mb.buf[i] = 0xFF
}
// Try to locate by timestamp
pos, err := mb.locateByTs(mb.startTime)
// Should return error
if err == nil {
t.Errorf("Expected corruption error, got nil (pos=%d)", pos)
} else {
t.Logf("✓ locateByTs correctly detected corruption: %v", err)
}
}
// TestErrorPropagationChain tests the complete error propagation from readTs -> locateByTs -> ReadFromBuffer
func TestErrorPropagationChain(t *testing.T) {
t.Run("Corruption in readTs", func(t *testing.T) {
// Already covered by TestReadTsCorruptedBuffer
t.Log("✓ readTs error propagation tested")
})
t.Run("Corruption in locateByTs", func(t *testing.T) {
// Already covered by TestLocateByTsCorruption
t.Log("✓ locateByTs error propagation tested")
})
t.Run("Corruption in ReadFromBuffer binary search", func(t *testing.T) {
// Already covered by TestReadFromBufferCorruption
t.Log("✓ ReadFromBuffer error propagation tested")
})
t.Log("✓ Complete error propagation chain verified")
}
// TestNoSilentCorruption verifies that corruption never returns (0, 0) silently
func TestNoSilentCorruption(t *testing.T) {
// Create various corrupted buffers
testCases := []struct {
name string
buf []byte
pos int
}{
{
name: "Invalid protobuf",
buf: []byte{10, 0, 0, 0, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
pos: 0,
},
{
name: "Truncated data",
buf: []byte{100, 0, 0, 0, 1, 2, 3}, // Size says 100 but only 3 bytes available
pos: 0,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
size, ts, err := readTs(tc.buf, tc.pos)
// CRITICAL: Must return error, never silent (0, 0)
if err == nil {
t.Errorf("CRITICAL: readTs returned (%d, %d, nil) for corrupted buffer - this causes silent data corruption!", size, ts)
} else {
t.Logf("✓ Correctly returned error instead of silent (0, 0): %v", err)
}
// On error, size and ts should be 0
if size != 0 || ts != 0 {
t.Errorf("On error, expected (0, 0), got (%d, %d)", size, ts)
}
})
}
}
|