diff options
Diffstat (limited to 'weed/mq/logstore/write_rows_no_panic_test.go')
| -rw-r--r-- | weed/mq/logstore/write_rows_no_panic_test.go | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/weed/mq/logstore/write_rows_no_panic_test.go b/weed/mq/logstore/write_rows_no_panic_test.go new file mode 100644 index 000000000..4e40b6d09 --- /dev/null +++ b/weed/mq/logstore/write_rows_no_panic_test.go @@ -0,0 +1,118 @@ +package logstore + +import ( + "os" + "testing" + + parquet "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress/zstd" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// TestWriteRowsNoPanic builds a representative schema and rows and ensures WriteRows completes without panic. +func TestWriteRowsNoPanic(t *testing.T) { + // Build schema similar to ecommerce.user_events + recordType := schema.RecordTypeBegin(). + WithField("id", schema.TypeInt64). + WithField("user_id", schema.TypeInt64). + WithField("user_type", schema.TypeString). + WithField("action", schema.TypeString). + WithField("status", schema.TypeString). + WithField("amount", schema.TypeDouble). + WithField("timestamp", schema.TypeString). + WithField("metadata", schema.TypeString). + RecordTypeEnd() + + // Add log columns + recordType = schema.NewRecordTypeBuilder(recordType). + WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + + ps, err := schema.ToParquetSchema("synthetic", recordType) + if err != nil { + t.Fatalf("schema: %v", err) + } + levels, err := schema.ToParquetLevels(recordType) + if err != nil { + t.Fatalf("levels: %v", err) + } + + tmp, err := os.CreateTemp(".", "synthetic*.parquet") + if err != nil { + t.Fatalf("tmp: %v", err) + } + defer func() { + tmp.Close() + os.Remove(tmp.Name()) + }() + + w := parquet.NewWriter(tmp, ps, + parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}), + parquet.DataPageStatistics(true), + ) + defer w.Close() + + rb := parquet.NewRowBuilder(ps) + var rows []parquet.Row + + // Build a few hundred rows with various optional/missing values and nil/empty keys + for i := 0; i < 200; i++ { + rb.Reset() + + rec := &schema_pb.RecordValue{Fields: map[string]*schema_pb.Value{}} + // Required-like fields present + rec.Fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(1000 + i)}} + rec.Fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(i)}} + rec.Fields["user_type"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "standard"}} + rec.Fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "click"}} + rec.Fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "active"}} + + // Optional fields vary: sometimes omitted, sometimes empty + if i%3 == 0 { + rec.Fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: float64(i)}} + } + if i%4 == 0 { + rec.Fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: ""}} + } + if i%5 == 0 { + rec.Fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "2025-09-03T15:36:29Z"}} + } + + // Log columns + rec.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: int64(1756913789000000000 + i)}} + var keyBytes []byte + if i%7 == 0 { + keyBytes = nil // ensure nil-keys are handled + } else if i%7 == 1 { + keyBytes = []byte{} // empty + } else { + keyBytes = []byte("key-") + } + rec.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: keyBytes}} + + if err := schema.AddRecordValue(rb, recordType, levels, rec); err != nil { + t.Fatalf("add record: %v", err) + } + rows = append(rows, rb.Row()) + } + + deferredPanicked := false + defer func() { + if r := recover(); r != nil { + deferredPanicked = true + t.Fatalf("unexpected panic: %v", r) + } + }() + + if _, err := w.WriteRows(rows); err != nil { + t.Fatalf("WriteRows: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + if deferredPanicked { + t.Fatal("panicked") + } +} |
