aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/gateway/test_mock_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/gateway/test_mock_handler.go')
-rw-r--r--weed/mq/kafka/gateway/test_mock_handler.go11
1 files changed, 8 insertions, 3 deletions
diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go
index 4bb0e28b1..c01aac970 100644
--- a/weed/mq/kafka/gateway/test_mock_handler.go
+++ b/weed/mq/kafka/gateway/test_mock_handler.go
@@ -98,7 +98,11 @@ func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTop
return info, exists
}
-func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+func (m *mockSeaweedMQHandler) InvalidateTopicExistsCache(topic string) {
+ // Mock handler doesn't cache topic existence, so this is a no-op
+}
+
+func (m *mockSeaweedMQHandler) ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -117,6 +121,7 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32
offset := m.offsets[topicName][partitionID]
m.offsets[topicName][partitionID]++
+
// Store record
record := &mockRecord{
key: key,
@@ -128,8 +133,8 @@ func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32
return offset, nil
}
-func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
- return m.ProduceRecord(topicName, partitionID, key, recordValueBytes)
+func (m *mockSeaweedMQHandler) ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+ return m.ProduceRecord(ctx, topicName, partitionID, key, recordValueBytes)
}
func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {