package log_buffer import ( "errors" "testing" "time" "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" ) // TestReadTsCorruptedBuffer tests that readTs properly returns an error for corrupted data func TestReadTsCorruptedBuffer(t *testing.T) { // Create a corrupted buffer with invalid protobuf data buf := make([]byte, 100) // Set size field to 10 bytes (using proper encoding) util.Uint32toBytes(buf[0:4], 10) // Fill with garbage data that won't unmarshal as LogEntry for i := 4; i < 14; i++ { buf[i] = 0xFF } // Attempt to read timestamp size, ts, err := readTs(buf, 0) // Should return an error if err == nil { t.Error("Expected error for corrupted buffer, got nil") } // Size and ts should be zero on error if size != 0 { t.Errorf("Expected size=0 on error, got %d", size) } if ts != 0 { t.Errorf("Expected ts=0 on error, got %d", ts) } // Error should indicate corruption if !errors.Is(err, ErrBufferCorrupted) { t.Logf("Error message: %v", err) // Check if error message contains expected text if err.Error() == "" || len(err.Error()) == 0 { t.Error("Expected non-empty error message") } } t.Logf("✓ readTs correctly returned error for corrupted buffer: %v", err) } // TestReadTsValidBuffer tests that readTs works correctly for valid data func TestReadTsValidBuffer(t *testing.T) { // Create a valid LogEntry logEntry := &filer_pb.LogEntry{ TsNs: 123456789, Key: []byte("test-key"), } // Marshal it data, err := proto.Marshal(logEntry) if err != nil { t.Fatalf("Failed to marshal LogEntry: %v", err) } // Create buffer with size prefix using util function buf := make([]byte, 4+len(data)) util.Uint32toBytes(buf[0:4], uint32(len(data))) copy(buf[4:], data) // Read timestamp size, ts, err := readTs(buf, 0) // Should succeed if err != nil { t.Fatalf("Expected no error for valid buffer, got: %v", err) } // Should return correct values if size != len(data) { t.Errorf("Expected size=%d, got %d", len(data), size) } if ts != logEntry.TsNs { t.Errorf("Expected ts=%d, got %d", logEntry.TsNs, ts) } t.Logf("✓ readTs correctly parsed valid buffer: size=%d, ts=%d", size, ts) } // TestReadFromBufferCorruption tests that ReadFromBuffer propagates corruption errors func TestReadFromBufferCorruption(t *testing.T) { lb := NewLogBuffer("test-corruption", time.Second, nil, nil, func() {}) // Add a valid entry first using AddDataToBuffer validKey := []byte("valid") validData, _ := proto.Marshal(&filer_pb.LogEntry{ TsNs: 1000, Key: validKey, }) if err := lb.AddDataToBuffer(validKey, validData, 1000); err != nil { t.Fatalf("Failed to add data to buffer: %v", err) } // Manually corrupt the buffer by writing garbage // This simulates a corruption scenario if len(lb.idx) > 0 { pos := lb.idx[0] // Overwrite the protobuf data with garbage for i := pos + 4; i < pos+8 && i < len(lb.buf); i++ { lb.buf[i] = 0xFF } } // Try to read - should detect corruption startPos := MessagePosition{Time: lb.startTime} buf, offset, err := lb.ReadFromBuffer(startPos) // Should return corruption error if err == nil { t.Error("Expected corruption error, got nil") if buf != nil { t.Logf("Unexpected success: got buffer with %d bytes", buf.Len()) } } else { // Verify it's a corruption error if !errors.Is(err, ErrBufferCorrupted) { t.Logf("Got error (not ErrBufferCorrupted sentinel, but still an error): %v", err) } t.Logf("✓ ReadFromBuffer correctly detected corruption: %v", err) } t.Logf("ReadFromBuffer result: buf=%v, offset=%d, err=%v", buf != nil, offset, err) } // TestLocateByTsCorruption tests that locateByTs propagates corruption errors func TestLocateByTsCorruption(t *testing.T) { // Create a MemBuffer with corrupted data mb := &MemBuffer{ buf: make([]byte, 100), size: 14, } // Set size field (using proper encoding) util.Uint32toBytes(mb.buf[0:4], 10) // Fill with garbage for i := 4; i < 14; i++ { mb.buf[i] = 0xFF } // Try to locate by timestamp pos, err := mb.locateByTs(mb.startTime) // Should return error if err == nil { t.Errorf("Expected corruption error, got nil (pos=%d)", pos) } else { t.Logf("✓ locateByTs correctly detected corruption: %v", err) } } // TestErrorPropagationChain tests the complete error propagation from readTs -> locateByTs -> ReadFromBuffer func TestErrorPropagationChain(t *testing.T) { t.Run("Corruption in readTs", func(t *testing.T) { // Already covered by TestReadTsCorruptedBuffer t.Log("✓ readTs error propagation tested") }) t.Run("Corruption in locateByTs", func(t *testing.T) { // Already covered by TestLocateByTsCorruption t.Log("✓ locateByTs error propagation tested") }) t.Run("Corruption in ReadFromBuffer binary search", func(t *testing.T) { // Already covered by TestReadFromBufferCorruption t.Log("✓ ReadFromBuffer error propagation tested") }) t.Log("✓ Complete error propagation chain verified") } // TestNoSilentCorruption verifies that corruption never returns (0, 0) silently func TestNoSilentCorruption(t *testing.T) { // Create various corrupted buffers testCases := []struct { name string buf []byte pos int }{ { name: "Invalid protobuf", buf: []byte{10, 0, 0, 0, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}, pos: 0, }, { name: "Truncated data", buf: []byte{100, 0, 0, 0, 1, 2, 3}, // Size says 100 but only 3 bytes available pos: 0, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { size, ts, err := readTs(tc.buf, tc.pos) // CRITICAL: Must return error, never silent (0, 0) if err == nil { t.Errorf("CRITICAL: readTs returned (%d, %d, nil) for corrupted buffer - this causes silent data corruption!", size, ts) } else { t.Logf("✓ Correctly returned error instead of silent (0, 0): %v", err) } // On error, size and ts should be 0 if size != 0 || ts != 0 { t.Errorf("On error, expected (0, 0), got (%d, %d)", size, ts) } }) } }