aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/metadata_blocking_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/metadata_blocking_test.go')
-rw-r--r--weed/mq/kafka/protocol/metadata_blocking_test.go24
1 files changed, 18 insertions, 6 deletions
diff --git a/weed/mq/kafka/protocol/metadata_blocking_test.go b/weed/mq/kafka/protocol/metadata_blocking_test.go
index 403489210..e5dfd1f95 100644
--- a/weed/mq/kafka/protocol/metadata_blocking_test.go
+++ b/weed/mq/kafka/protocol/metadata_blocking_test.go
@@ -163,11 +163,11 @@ func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo
return nil, false
}
-func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+func (h *FastMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
-func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+func (h *FastMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
@@ -199,6 +199,10 @@ func (h *FastMockHandler) SetProtocolHandler(handler integration.ProtocolHandler
// No-op
}
+func (h *FastMockHandler) InvalidateTopicExistsCache(topic string) {
+ // No-op for mock
+}
+
func (h *FastMockHandler) Close() error {
return nil
}
@@ -234,11 +238,11 @@ func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopic
return nil, false
}
-func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+func (h *BlockingMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
-func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+func (h *BlockingMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
@@ -270,6 +274,10 @@ func (h *BlockingMockHandler) SetProtocolHandler(handler integration.ProtocolHan
// No-op
}
+func (h *BlockingMockHandler) InvalidateTopicExistsCache(topic string) {
+ // No-op for mock
+}
+
func (h *BlockingMockHandler) Close() error {
return nil
}
@@ -320,11 +328,11 @@ func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaT
return nil, false
}
-func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+func (h *TimeoutAwareMockHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
-func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+func (h *TimeoutAwareMockHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
return 0, fmt.Errorf("not implemented")
}
@@ -356,6 +364,10 @@ func (h *TimeoutAwareMockHandler) SetProtocolHandler(handler integration.Protoco
// No-op
}
+func (h *TimeoutAwareMockHandler) InvalidateTopicExistsCache(topic string) {
+ // No-op for mock
+}
+
func (h *TimeoutAwareMockHandler) Close() error {
return nil
}