diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer_queryability_test.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer_queryability_test.go | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_buffer_queryability_test.go b/weed/util/log_buffer/log_buffer_queryability_test.go new file mode 100644 index 000000000..6e372d2b3 --- /dev/null +++ b/weed/util/log_buffer/log_buffer_queryability_test.go @@ -0,0 +1,238 @@ +package log_buffer + +import ( + "bytes" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/protobuf/proto" +) + +// TestBufferQueryability tests that data written to the buffer can be immediately queried +func TestBufferQueryability(t *testing.T) { + // Create a log buffer with a long flush interval to prevent premature flushing + logBuffer := NewLogBuffer("test-buffer", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Mock flush function - do nothing to keep data in memory + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + // Mock read from disk function + return startPosition, false, nil + }, + func() { + // Mock notify function + }) + + // Test data similar to schema registry messages + testKey := []byte(`{"keytype":"SCHEMA","subject":"test-topic-value","version":1,"magic":1}`) + testValue := []byte(`{"subject":"test-topic-value","version":1,"id":1,"schemaType":"AVRO","schema":"\"string\"","deleted":false}`) + + // Create a LogEntry with offset (simulating the schema registry scenario) + logEntry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + PartitionKeyHash: 12345, + Data: testValue, + Key: testKey, + Offset: 1, + } + + // Add the entry to the buffer + logBuffer.AddLogEntryToBuffer(logEntry) + + // Verify the buffer has data + if logBuffer.pos == 0 { + t.Fatal("Buffer should have data after adding entry") + } + + // Test immediate queryability - read from buffer starting from beginning + startPosition := NewMessagePosition(0, 0) // Start from beginning + bufferCopy, batchIndex, err := logBuffer.ReadFromBuffer(startPosition) + + if err != nil { + t.Fatalf("ReadFromBuffer failed: %v", err) + } + + if bufferCopy == nil { + t.Fatal("ReadFromBuffer returned nil buffer - data should be queryable immediately") + } + + if batchIndex != 1 { + t.Errorf("Expected batchIndex=1, got %d", batchIndex) + } + + // Verify we can read the data back + buf := bufferCopy.Bytes() + if len(buf) == 0 { + t.Fatal("Buffer copy is empty") + } + + // Parse the first entry from the buffer + if len(buf) < 4 { + t.Fatal("Buffer too small to contain entry size") + } + + size := util.BytesToUint32(buf[0:4]) + if len(buf) < 4+int(size) { + t.Fatalf("Buffer too small to contain entry data: need %d, have %d", 4+int(size), len(buf)) + } + + entryData := buf[4 : 4+int(size)] + + // Unmarshal and verify the entry + retrievedEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, retrievedEntry); err != nil { + t.Fatalf("Failed to unmarshal retrieved entry: %v", err) + } + + // Verify the data matches + if !bytes.Equal(retrievedEntry.Key, testKey) { + t.Errorf("Key mismatch: expected %s, got %s", string(testKey), string(retrievedEntry.Key)) + } + + if !bytes.Equal(retrievedEntry.Data, testValue) { + t.Errorf("Value mismatch: expected %s, got %s", string(testValue), string(retrievedEntry.Data)) + } + + if retrievedEntry.Offset != 1 { + t.Errorf("Offset mismatch: expected 1, got %d", retrievedEntry.Offset) + } + + t.Logf("Buffer queryability test passed - data is immediately readable") +} + +// TestMultipleEntriesQueryability tests querying multiple entries from buffer +func TestMultipleEntriesQueryability(t *testing.T) { + logBuffer := NewLogBuffer("test-multi-buffer", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Mock flush function + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + return startPosition, false, nil + }, + func() {}) + + // Add multiple entries + for i := 1; i <= 3; i++ { + logEntry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano() + int64(i*1000), // Ensure different timestamps + PartitionKeyHash: int32(i), + Data: []byte("test-data-" + string(rune('0'+i))), + Key: []byte("test-key-" + string(rune('0'+i))), + Offset: int64(i), + } + logBuffer.AddLogEntryToBuffer(logEntry) + } + + // Read all entries + startPosition := NewMessagePosition(0, 0) + bufferCopy, batchIndex, err := logBuffer.ReadFromBuffer(startPosition) + + if err != nil { + t.Fatalf("ReadFromBuffer failed: %v", err) + } + + if bufferCopy == nil { + t.Fatal("ReadFromBuffer returned nil buffer") + } + + if batchIndex != 3 { + t.Errorf("Expected batchIndex=3, got %d", batchIndex) + } + + // Count entries in buffer + buf := bufferCopy.Bytes() + entryCount := 0 + pos := 0 + + for pos+4 < len(buf) { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + entry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, entry); err != nil { + t.Fatalf("Failed to unmarshal entry %d: %v", entryCount+1, err) + } + + entryCount++ + pos += 4 + int(size) + + t.Logf("Entry %d: Key=%s, Data=%s, Offset=%d", entryCount, string(entry.Key), string(entry.Data), entry.Offset) + } + + if entryCount != 3 { + t.Errorf("Expected 3 entries, found %d", entryCount) + } + + t.Logf("Multiple entries queryability test passed - found %d entries", entryCount) +} + +// TestSchemaRegistryScenario tests the specific scenario that was failing +func TestSchemaRegistryScenario(t *testing.T) { + logBuffer := NewLogBuffer("_schemas", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Mock flush function - simulate what happens in real scenario + t.Logf("FLUSH: startTime=%v, stopTime=%v, bufSize=%d, minOffset=%d, maxOffset=%d", + startTime, stopTime, len(buf), minOffset, maxOffset) + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + return startPosition, false, nil + }, + func() {}) + + // Simulate schema registry message + schemaKey := []byte(`{"keytype":"SCHEMA","subject":"test-schema-value","version":1,"magic":1}`) + schemaValue := []byte(`{"subject":"test-schema-value","version":1,"id":12,"schemaType":"AVRO","schema":"\"string\"","deleted":false}`) + + logEntry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + PartitionKeyHash: 12345, + Data: schemaValue, + Key: schemaKey, + Offset: 0, // First message + } + + // Add to buffer + logBuffer.AddLogEntryToBuffer(logEntry) + + // Simulate the SQL query scenario - read from offset 0 + startPosition := NewMessagePosition(0, 0) + bufferCopy, _, err := logBuffer.ReadFromBuffer(startPosition) + + if err != nil { + t.Fatalf("Schema registry scenario failed: %v", err) + } + + if bufferCopy == nil { + t.Fatal("Schema registry scenario: ReadFromBuffer returned nil - this is the bug!") + } + + // Verify schema data is readable + buf := bufferCopy.Bytes() + if len(buf) < 4 { + t.Fatal("Buffer too small") + } + + size := util.BytesToUint32(buf[0:4]) + entryData := buf[4 : 4+int(size)] + + retrievedEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, retrievedEntry); err != nil { + t.Fatalf("Failed to unmarshal schema entry: %v", err) + } + + // Verify schema value is preserved + if !bytes.Equal(retrievedEntry.Data, schemaValue) { + t.Errorf("Schema value lost! Expected: %s, Got: %s", string(schemaValue), string(retrievedEntry.Data)) + } + + if len(retrievedEntry.Data) != len(schemaValue) { + t.Errorf("Schema value length mismatch! Expected: %d, Got: %d", len(schemaValue), len(retrievedEntry.Data)) + } + + t.Logf("Schema registry scenario test passed - schema value preserved: %d bytes", len(retrievedEntry.Data)) +} |
