1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// Configuration
brokerAddress := "localhost:9093" // Kafka gateway port (not SeaweedMQ broker port 17777)
topicName := "_raw_messages" // Topic with "_" prefix - should skip schema validation
fmt.Printf("Publishing messages to topic '%s' on broker '%s'\n", topicName, brokerAddress)
// Create a new writer
writer := &kafka.Writer{
Addr: kafka.TCP(brokerAddress),
Topic: topicName,
Balancer: &kafka.LeastBytes{},
// Configure for immediate delivery (useful for testing)
BatchTimeout: 10 * time.Millisecond,
BatchSize: 1,
}
defer writer.Close()
// Sample data to publish
messages := []map[string]interface{}{
{
"id": 1,
"message": "Hello from kafka-go client",
"timestamp": time.Now().Unix(),
"user_id": "user123",
},
{
"id": 2,
"message": "Raw message without schema validation",
"timestamp": time.Now().Unix(),
"user_id": "user456",
"metadata": map[string]string{
"source": "test-client",
"type": "raw",
},
},
{
"id": 3,
"message": "Testing SMQ with underscore prefix topic",
"timestamp": time.Now().Unix(),
"user_id": "user789",
"data": []byte("Some binary data here"),
},
}
ctx := context.Background()
fmt.Println("Publishing messages...")
for i, msgData := range messages {
// Convert message to JSON (simulating raw messages stored in "value" field)
valueBytes, err := json.Marshal(msgData)
if err != nil {
log.Fatalf("Failed to marshal message %d: %v", i+1, err)
}
// Create Kafka message
msg := kafka.Message{
Key: []byte(fmt.Sprintf("key_%d", msgData["id"])),
Value: valueBytes,
Headers: []kafka.Header{
{Key: "source", Value: []byte("kafka-go-client")},
{Key: "content-type", Value: []byte("application/json")},
},
}
// Write message
err = writer.WriteMessages(ctx, msg)
if err != nil {
log.Printf("Failed to write message %d: %v", i+1, err)
continue
}
fmt.Printf("-Published message %d: %s\n", i+1, string(valueBytes))
// Small delay between messages
time.Sleep(100 * time.Millisecond)
}
fmt.Println("\nAll messages published successfully!")
// Test with different raw message types
fmt.Println("\nPublishing different raw message formats...")
rawMessages := []kafka.Message{
{
Key: []byte("binary_key"),
Value: []byte("Simple string message"),
},
{
Key: []byte("json_key"),
Value: []byte(`{"raw_field": "raw_value", "number": 42}`),
},
{
Key: []byte("empty_key"),
Value: []byte{}, // Empty value
},
{
Key: nil, // No key
Value: []byte("Message with no key"),
},
}
for i, msg := range rawMessages {
err := writer.WriteMessages(ctx, msg)
if err != nil {
log.Printf("Failed to write raw message %d: %v", i+1, err)
continue
}
fmt.Printf("-Published raw message %d: key=%s, value=%s\n",
i+1, string(msg.Key), string(msg.Value))
}
fmt.Println("\nAll test messages published to topic with '_' prefix!")
fmt.Println("These messages should be stored as raw bytes without schema validation.")
}
|