diff options
Diffstat (limited to 'weed/query/engine/mocks_test.go')
| -rw-r--r-- | weed/query/engine/mocks_test.go | 69 |
1 files changed, 39 insertions, 30 deletions
diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go index 733d99af7..2f72ed9ed 100644 --- a/weed/query/engine/mocks_test.go +++ b/weed/query/engine/mocks_test.go @@ -879,17 +879,51 @@ func (m *MockBrokerClient) ListTopics(ctx context.Context, namespace string) ([] return []string{}, nil } -// GetTopicSchema returns the mock schema for a topic -func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) { +// GetTopicSchema returns flat schema and key columns for a topic +func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) { if m.shouldFail { - return nil, fmt.Errorf("mock broker failure: %s", m.failMessage) + return nil, nil, "", fmt.Errorf("mock broker failure: %s", m.failMessage) } key := fmt.Sprintf("%s.%s", namespace, topic) if schema, exists := m.schemas[key]; exists { - return schema, nil + // For testing, assume first field is key column + var keyColumns []string + if len(schema.Fields) > 0 { + keyColumns = []string{schema.Fields[0].Name} + } + return schema, keyColumns, "", nil // Schema format empty for mocks + } + return nil, nil, "", fmt.Errorf("topic %s not found", key) +} + +// ConfigureTopic creates or modifies a topic using flat schema format +func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error { + if m.shouldFail { + return fmt.Errorf("mock broker failure: %s", m.failMessage) + } + + // Store the schema for future retrieval + key := fmt.Sprintf("%s.%s", namespace, topicName) + m.schemas[key] = flatSchema + + // Add topic to namespace if it doesn't exist + if topics, exists := m.topics[namespace]; exists { + found := false + for _, t := range topics { + if t == topicName { + found = true + break + } + } + if !found { + m.topics[namespace] = append(topics, topicName) + } + } else { + m.topics[namespace] = []string{topicName} } - return nil, fmt.Errorf("topic %s not found", key) + + return nil } // GetFilerClient returns a mock filer client @@ -960,31 +994,6 @@ func (t *TestHybridMessageScanner) ScanMessages(ctx context.Context, options Hyb return generateSampleHybridData(t.topicName, options), nil } -// ConfigureTopic creates or updates a topic configuration (mock implementation) -func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error { - if m.shouldFail { - return fmt.Errorf("mock broker failure: %s", m.failMessage) - } - - // Store the schema in our mock data - key := fmt.Sprintf("%s.%s", namespace, topicName) - m.schemas[key] = recordType - - // Add to topics list if not already present - if topics, exists := m.topics[namespace]; exists { - for _, topic := range topics { - if topic == topicName { - return nil // Already exists - } - } - m.topics[namespace] = append(topics, topicName) - } else { - m.topics[namespace] = []string{topicName} - } - - return nil -} - // DeleteTopic removes a topic and all its data (mock implementation) func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error { if m.shouldFail { |
