aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer_corruption_test.go
blob: 2f7a029e617a4f445a20126dc1de931faf356487 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package log_buffer

import (
	"errors"
	"testing"
	"time"

	"google.golang.org/protobuf/proto"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

// TestReadTsCorruptedBuffer tests that readTs properly returns an error for corrupted data
func TestReadTsCorruptedBuffer(t *testing.T) {
	// Create a corrupted buffer with invalid protobuf data
	buf := make([]byte, 100)

	// Set size field to 10 bytes (using proper encoding)
	util.Uint32toBytes(buf[0:4], 10)

	// Fill with garbage data that won't unmarshal as LogEntry
	for i := 4; i < 14; i++ {
		buf[i] = 0xFF
	}

	// Attempt to read timestamp
	size, ts, err := readTs(buf, 0)

	// Should return an error
	if err == nil {
		t.Error("Expected error for corrupted buffer, got nil")
	}

	// Size and ts should be zero on error
	if size != 0 {
		t.Errorf("Expected size=0 on error, got %d", size)
	}

	if ts != 0 {
		t.Errorf("Expected ts=0 on error, got %d", ts)
	}

	// Error should indicate corruption
	if !errors.Is(err, ErrBufferCorrupted) {
		t.Logf("Error message: %v", err)
		// Check if error message contains expected text
		if err.Error() == "" || len(err.Error()) == 0 {
			t.Error("Expected non-empty error message")
		}
	}

	t.Logf("✓ readTs correctly returned error for corrupted buffer: %v", err)
}

// TestReadTsValidBuffer tests that readTs works correctly for valid data
func TestReadTsValidBuffer(t *testing.T) {
	// Create a valid LogEntry
	logEntry := &filer_pb.LogEntry{
		TsNs: 123456789,
		Key:  []byte("test-key"),
	}

	// Marshal it
	data, err := proto.Marshal(logEntry)
	if err != nil {
		t.Fatalf("Failed to marshal LogEntry: %v", err)
	}

	// Create buffer with size prefix using util function
	buf := make([]byte, 4+len(data))
	util.Uint32toBytes(buf[0:4], uint32(len(data)))
	copy(buf[4:], data)

	// Read timestamp
	size, ts, err := readTs(buf, 0)

	// Should succeed
	if err != nil {
		t.Fatalf("Expected no error for valid buffer, got: %v", err)
	}

	// Should return correct values
	if size != len(data) {
		t.Errorf("Expected size=%d, got %d", len(data), size)
	}

	if ts != logEntry.TsNs {
		t.Errorf("Expected ts=%d, got %d", logEntry.TsNs, ts)
	}

	t.Logf("✓ readTs correctly parsed valid buffer: size=%d, ts=%d", size, ts)
}

// TestReadFromBufferCorruption tests that ReadFromBuffer propagates corruption errors
func TestReadFromBufferCorruption(t *testing.T) {
	lb := NewLogBuffer("test-corruption", time.Second, nil, nil, func() {})

	// Add a valid entry first using AddDataToBuffer
	validKey := []byte("valid")
	validData, _ := proto.Marshal(&filer_pb.LogEntry{
		TsNs: 1000,
		Key:  validKey,
	})
	if err := lb.AddDataToBuffer(validKey, validData, 1000); err != nil {
		t.Fatalf("Failed to add data to buffer: %v", err)
	}

	// Manually corrupt the buffer by writing garbage
	// This simulates a corruption scenario
	if len(lb.idx) > 0 {
		pos := lb.idx[0]
		// Overwrite the protobuf data with garbage
		for i := pos + 4; i < pos+8 && i < len(lb.buf); i++ {
			lb.buf[i] = 0xFF
		}
	}

	// Try to read - should detect corruption
	startPos := MessagePosition{Time: lb.startTime}
	buf, offset, err := lb.ReadFromBuffer(startPos)

	// Should return corruption error
	if err == nil {
		t.Error("Expected corruption error, got nil")
		if buf != nil {
			t.Logf("Unexpected success: got buffer with %d bytes", buf.Len())
		}
	} else {
		// Verify it's a corruption error
		if !errors.Is(err, ErrBufferCorrupted) {
			t.Logf("Got error (not ErrBufferCorrupted sentinel, but still an error): %v", err)
		}
		t.Logf("✓ ReadFromBuffer correctly detected corruption: %v", err)
	}

	t.Logf("ReadFromBuffer result: buf=%v, offset=%d, err=%v", buf != nil, offset, err)
}

// TestLocateByTsCorruption tests that locateByTs propagates corruption errors
func TestLocateByTsCorruption(t *testing.T) {
	// Create a MemBuffer with corrupted data
	mb := &MemBuffer{
		buf:  make([]byte, 100),
		size: 14,
	}

	// Set size field (using proper encoding)
	util.Uint32toBytes(mb.buf[0:4], 10)

	// Fill with garbage
	for i := 4; i < 14; i++ {
		mb.buf[i] = 0xFF
	}

	// Try to locate by timestamp
	pos, err := mb.locateByTs(mb.startTime)

	// Should return error
	if err == nil {
		t.Errorf("Expected corruption error, got nil (pos=%d)", pos)
	} else {
		t.Logf("✓ locateByTs correctly detected corruption: %v", err)
	}
}

// TestErrorPropagationChain tests the complete error propagation from readTs -> locateByTs -> ReadFromBuffer
func TestErrorPropagationChain(t *testing.T) {
	t.Run("Corruption in readTs", func(t *testing.T) {
		// Already covered by TestReadTsCorruptedBuffer
		t.Log("✓ readTs error propagation tested")
	})

	t.Run("Corruption in locateByTs", func(t *testing.T) {
		// Already covered by TestLocateByTsCorruption
		t.Log("✓ locateByTs error propagation tested")
	})

	t.Run("Corruption in ReadFromBuffer binary search", func(t *testing.T) {
		// Already covered by TestReadFromBufferCorruption
		t.Log("✓ ReadFromBuffer error propagation tested")
	})

	t.Log("✓ Complete error propagation chain verified")
}

// TestNoSilentCorruption verifies that corruption never returns (0, 0) silently
func TestNoSilentCorruption(t *testing.T) {
	// Create various corrupted buffers
	testCases := []struct {
		name string
		buf  []byte
		pos  int
	}{
		{
			name: "Invalid protobuf",
			buf:  []byte{10, 0, 0, 0, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF},
			pos:  0,
		},
		{
			name: "Truncated data",
			buf:  []byte{100, 0, 0, 0, 1, 2, 3}, // Size says 100 but only 3 bytes available
			pos:  0,
		},
	}

	for _, tc := range testCases {
		t.Run(tc.name, func(t *testing.T) {
			size, ts, err := readTs(tc.buf, tc.pos)

			// CRITICAL: Must return error, never silent (0, 0)
			if err == nil {
				t.Errorf("CRITICAL: readTs returned (%d, %d, nil) for corrupted buffer - this causes silent data corruption!", size, ts)
			} else {
				t.Logf("✓ Correctly returned error instead of silent (0, 0): %v", err)
			}

			// On error, size and ts should be 0
			if size != 0 || ts != 0 {
				t.Errorf("On error, expected (0, 0), got (%d, %d)", size, ts)
			}
		})
	}
}