aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/seaweedmq_handler_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/seaweedmq_handler_test.go')
-rw-r--r--weed/mq/kafka/integration/seaweedmq_handler_test.go11
1 files changed, 6 insertions, 5 deletions
diff --git a/weed/mq/kafka/integration/seaweedmq_handler_test.go b/weed/mq/kafka/integration/seaweedmq_handler_test.go
index a01152e79..d16d8e10f 100644
--- a/weed/mq/kafka/integration/seaweedmq_handler_test.go
+++ b/weed/mq/kafka/integration/seaweedmq_handler_test.go
@@ -1,6 +1,7 @@
package integration
import (
+ "context"
"testing"
"time"
)
@@ -269,7 +270,7 @@ func TestSeaweedMQHandler_ProduceRecord(t *testing.T) {
key := []byte("produce-key")
value := []byte("produce-value")
- offset, err := handler.ProduceRecord(topicName, 0, key, value)
+ offset, err := handler.ProduceRecord(context.Background(), topicName, 0, key, value)
if err != nil {
t.Fatalf("Failed to produce record: %v", err)
}
@@ -316,7 +317,7 @@ func TestSeaweedMQHandler_MultiplePartitions(t *testing.T) {
key := []byte("partition-key")
value := []byte("partition-value")
- offset, err := handler.ProduceRecord(topicName, partitionID, key, value)
+ offset, err := handler.ProduceRecord(context.Background(), topicName, partitionID, key, value)
if err != nil {
t.Fatalf("Failed to produce to partition %d: %v", partitionID, err)
}
@@ -366,7 +367,7 @@ func TestSeaweedMQHandler_FetchRecords(t *testing.T) {
var producedOffsets []int64
for i, record := range testRecords {
- offset, err := handler.ProduceRecord(topicName, 0, []byte(record.key), []byte(record.value))
+ offset, err := handler.ProduceRecord(context.Background(), topicName, 0, []byte(record.key), []byte(record.value))
if err != nil {
t.Fatalf("Failed to produce record %d: %v", i, err)
}
@@ -463,7 +464,7 @@ func TestSeaweedMQHandler_FetchRecords_ErrorHandling(t *testing.T) {
}
// Test with very small maxBytes
- _, err = handler.ProduceRecord(topicName, 0, []byte("key"), []byte("value"))
+ _, err = handler.ProduceRecord(context.Background(), topicName, 0, []byte("key"), []byte("value"))
if err != nil {
t.Fatalf("Failed to produce test record: %v", err)
}
@@ -490,7 +491,7 @@ func TestSeaweedMQHandler_ErrorHandling(t *testing.T) {
defer handler.Close()
// Try to produce to non-existent topic
- _, err = handler.ProduceRecord("non-existent-topic", 0, []byte("key"), []byte("value"))
+ _, err = handler.ProduceRecord(context.Background(), "non-existent-topic", 0, []byte("key"), []byte("value"))
if err == nil {
t.Errorf("Producing to non-existent topic should fail")
}