diff options
Diffstat (limited to 'weed/mq/kafka/protocol/metadata_blocking_test.go')
| -rw-r--r-- | weed/mq/kafka/protocol/metadata_blocking_test.go | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/weed/mq/kafka/protocol/metadata_blocking_test.go b/weed/mq/kafka/protocol/metadata_blocking_test.go index 403489210..e5dfd1f95 100644 --- a/weed/mq/kafka/protocol/metadata_blocking_test.go +++ b/weed/mq/kafka/protocol/metadata_blocking_test.go @@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo return nil, false } -func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } -func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { +func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } @@ -199,6 +199,10 @@ func (h *FastMockHandler) SetProtocolHandler(handler integration.ProtocolHandler // No-op } +func (h *FastMockHandler) InvalidateTopicExistsCache(topic string) { + // No-op for mock +} + func (h *FastMockHandler) Close() error { return nil } @@ -234,11 +238,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic return nil, false } -func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } -func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { +func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } @@ -270,6 +274,10 @@ func (h *BlockingMockHandler) SetProtocolHandler(handler integration.ProtocolHan // No-op } +func (h *BlockingMockHandler) InvalidateTopicExistsCache(topic string) { + // No-op for mock +} + func (h *BlockingMockHandler) Close() error { return nil } @@ -320,11 +328,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT return nil, false } -func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { +func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } -func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { +func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { return 0, fmt.Errorf("not implemented") } @@ -356,6 +364,10 @@ func (h *TimeoutAwareMockHandler) SetProtocolHandler(handler integration.Protoco // No-op } +func (h *TimeoutAwareMockHandler) InvalidateTopicExistsCache(topic string) { + // No-op for mock +} + func (h *TimeoutAwareMockHandler) Close() error { return nil } |
