diff options
Diffstat (limited to 'weed/mq/kafka/gateway/test_mock_handler.go')
| -rw-r--r-- | weed/mq/kafka/gateway/test_mock_handler.go | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go index 4bb0e28b1..c01aac970 100644 --- a/weed/mq/kafka/gateway/test_mock_handler.go +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -98,7 +98,11 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop return info, exists } -func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (m *mockSeaweedMQHandler) InvalidateTopicExistsCache(topic string) { + // Mock handler doesn't cache topic existence, so this is a no-op +} + +func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { m.mu.Lock() defer m.mu.Unlock() @@ -117,6 +121,7 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32 offset := m.offsets[topicName][partitionID] m.offsets[topicName][partitionID]++ + // Store record record := &mockRecord{ key: key, @@ -128,8 +133,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32 return offset, nil } -func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { - return m.ProduceRecord(topicName, partitionID, key, recordValueBytes) +func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { + return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes) } func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) { |
