aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/catalog.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/catalog.go')
-rw-r--r--weed/query/engine/catalog.go40
1 files changed, 36 insertions, 4 deletions
diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go
index 4cd39f3f0..f53e4cb2a 100644
--- a/weed/query/engine/catalog.go
+++ b/weed/query/engine/catalog.go
@@ -17,9 +17,9 @@ import (
type BrokerClientInterface interface {
ListNamespaces(ctx context.Context) ([]string, error)
ListTopics(ctx context.Context, namespace string) ([]string, error)
- GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error)
+ GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) // Returns (flatSchema, keyColumns, schemaFormat, error)
+ ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error
GetFilerClient() (filer_pb.FilerClient, error)
- ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error
DeleteTopic(ctx context.Context, namespace, topicName string) error
// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
// This prevents double-counting when combining with disk-based data
@@ -151,12 +151,24 @@ func (c *SchemaCatalog) ListTables(database string) ([]string, error) {
tables := make([]string, 0, len(db.Tables))
for name := range db.Tables {
+ // Skip .meta table
+ if name == ".meta" {
+ continue
+ }
tables = append(tables, name)
}
return tables, nil
}
- return topics, nil
+ // Filter out .meta table from topics
+ filtered := make([]string, 0, len(topics))
+ for _, topic := range topics {
+ if topic != ".meta" {
+ filtered = append(filtered, topic)
+ }
+ }
+
+ return filtered, nil
}
// GetTableInfo returns detailed schema information for a table
@@ -185,7 +197,7 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table)
+ recordType, _, _, err := c.brokerClient.GetTopicSchema(ctx, database, table)
if err != nil {
// If broker unavailable and we have expired cached data, return it
if exists {
@@ -278,7 +290,27 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch
// 1. MQ scalar types map directly to SQL types
// 2. Complex types (arrays, maps) are serialized as JSON strings
// 3. All fields are nullable unless specifically marked otherwise
+// 4. If no schema is defined, create a default schema with system fields and _value
func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) {
+ // Check if the schema has a valid RecordType
+ if mqSchema == nil || mqSchema.RecordType == nil {
+ // For topics without schema, create a default schema with system fields and _value
+ columns := []ColumnInfo{
+ {Name: SW_DISPLAY_NAME_TIMESTAMP, Type: "TIMESTAMP", Nullable: true},
+ {Name: SW_COLUMN_NAME_KEY, Type: "VARBINARY", Nullable: true},
+ {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(255)", Nullable: true},
+ {Name: SW_COLUMN_NAME_VALUE, Type: "VARBINARY", Nullable: true},
+ }
+
+ return &TableInfo{
+ Name: topicName,
+ Namespace: namespace,
+ Schema: nil, // No schema defined
+ Columns: columns,
+ RevisionId: 0,
+ }, nil
+ }
+
columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields))
for i, field := range mqSchema.RecordType.Fields {