aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/mocks_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/mocks_test.go')
-rw-r--r--weed/query/engine/mocks_test.go69
1 files changed, 39 insertions, 30 deletions
diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go
index 733d99af7..2f72ed9ed 100644
--- a/weed/query/engine/mocks_test.go
+++ b/weed/query/engine/mocks_test.go
@@ -879,17 +879,51 @@ func (m *MockBrokerClient) ListTopics(ctx context.Context, namespace string) ([]
return []string{}, nil
}
-// GetTopicSchema returns the mock schema for a topic
-func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) {
+// GetTopicSchema returns flat schema and key columns for a topic
+func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) {
if m.shouldFail {
- return nil, fmt.Errorf("mock broker failure: %s", m.failMessage)
+ return nil, nil, "", fmt.Errorf("mock broker failure: %s", m.failMessage)
}
key := fmt.Sprintf("%s.%s", namespace, topic)
if schema, exists := m.schemas[key]; exists {
- return schema, nil
+ // For testing, assume first field is key column
+ var keyColumns []string
+ if len(schema.Fields) > 0 {
+ keyColumns = []string{schema.Fields[0].Name}
+ }
+ return schema, keyColumns, "", nil // Schema format empty for mocks
+ }
+ return nil, nil, "", fmt.Errorf("topic %s not found", key)
+}
+
+// ConfigureTopic creates or modifies a topic using flat schema format
+func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error {
+ if m.shouldFail {
+ return fmt.Errorf("mock broker failure: %s", m.failMessage)
+ }
+
+ // Store the schema for future retrieval
+ key := fmt.Sprintf("%s.%s", namespace, topicName)
+ m.schemas[key] = flatSchema
+
+ // Add topic to namespace if it doesn't exist
+ if topics, exists := m.topics[namespace]; exists {
+ found := false
+ for _, t := range topics {
+ if t == topicName {
+ found = true
+ break
+ }
+ }
+ if !found {
+ m.topics[namespace] = append(topics, topicName)
+ }
+ } else {
+ m.topics[namespace] = []string{topicName}
}
- return nil, fmt.Errorf("topic %s not found", key)
+
+ return nil
}
// GetFilerClient returns a mock filer client
@@ -960,31 +994,6 @@ func (t *TestHybridMessageScanner) ScanMessages(ctx context.Context, options Hyb
return generateSampleHybridData(t.topicName, options), nil
}
-// ConfigureTopic creates or updates a topic configuration (mock implementation)
-func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error {
- if m.shouldFail {
- return fmt.Errorf("mock broker failure: %s", m.failMessage)
- }
-
- // Store the schema in our mock data
- key := fmt.Sprintf("%s.%s", namespace, topicName)
- m.schemas[key] = recordType
-
- // Add to topics list if not already present
- if topics, exists := m.topics[namespace]; exists {
- for _, topic := range topics {
- if topic == topicName {
- return nil // Already exists
- }
- }
- m.topics[namespace] = append(topics, topicName)
- } else {
- m.topics[namespace] = []string{topicName}
- }
-
- return nil
-}
-
// DeleteTopic removes a topic and all its data (mock implementation)
func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error {
if m.shouldFail {