aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/simple-publisher/main.go
blob: 6b7b4dffeaccba39a6d5d97197bc45f62b97d28d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	// Configuration
	brokerAddress := "localhost:9093" // Kafka gateway port (not SeaweedMQ broker port 17777)
	topicName := "_raw_messages"      // Topic with "_" prefix - should skip schema validation

	fmt.Printf("Publishing messages to topic '%s' on broker '%s'\n", topicName, brokerAddress)

	// Create a new writer
	writer := &kafka.Writer{
		Addr:     kafka.TCP(brokerAddress),
		Topic:    topicName,
		Balancer: &kafka.LeastBytes{},
		// Configure for immediate delivery (useful for testing)
		BatchTimeout: 10 * time.Millisecond,
		BatchSize:    1,
	}
	defer writer.Close()

	// Sample data to publish
	messages := []map[string]interface{}{
		{
			"id":        1,
			"message":   "Hello from kafka-go client",
			"timestamp": time.Now().Unix(),
			"user_id":   "user123",
		},
		{
			"id":        2,
			"message":   "Raw message without schema validation",
			"timestamp": time.Now().Unix(),
			"user_id":   "user456",
			"metadata": map[string]string{
				"source": "test-client",
				"type":   "raw",
			},
		},
		{
			"id":        3,
			"message":   "Testing SMQ with underscore prefix topic",
			"timestamp": time.Now().Unix(),
			"user_id":   "user789",
			"data":      []byte("Some binary data here"),
		},
	}

	ctx := context.Background()

	fmt.Println("Publishing messages...")
	for i, msgData := range messages {
		// Convert message to JSON (simulating raw messages stored in "value" field)
		valueBytes, err := json.Marshal(msgData)
		if err != nil {
			log.Fatalf("Failed to marshal message %d: %v", i+1, err)
		}

		// Create Kafka message
		msg := kafka.Message{
			Key:   []byte(fmt.Sprintf("key_%d", msgData["id"])),
			Value: valueBytes,
			Headers: []kafka.Header{
				{Key: "source", Value: []byte("kafka-go-client")},
				{Key: "content-type", Value: []byte("application/json")},
			},
		}

		// Write message
		err = writer.WriteMessages(ctx, msg)
		if err != nil {
			log.Printf("Failed to write message %d: %v", i+1, err)
			continue
		}

		fmt.Printf("-Published message %d: %s\n", i+1, string(valueBytes))

		// Small delay between messages
		time.Sleep(100 * time.Millisecond)
	}

	fmt.Println("\nAll messages published successfully!")

	// Test with different raw message types
	fmt.Println("\nPublishing different raw message formats...")

	rawMessages := []kafka.Message{
		{
			Key:   []byte("binary_key"),
			Value: []byte("Simple string message"),
		},
		{
			Key:   []byte("json_key"),
			Value: []byte(`{"raw_field": "raw_value", "number": 42}`),
		},
		{
			Key:   []byte("empty_key"),
			Value: []byte{}, // Empty value
		},
		{
			Key:   nil, // No key
			Value: []byte("Message with no key"),
		},
	}

	for i, msg := range rawMessages {
		err := writer.WriteMessages(ctx, msg)
		if err != nil {
			log.Printf("Failed to write raw message %d: %v", i+1, err)
			continue
		}
		fmt.Printf("-Published raw message %d: key=%s, value=%s\n",
			i+1, string(msg.Key), string(msg.Value))
	}

	fmt.Println("\nAll test messages published to topic with '_' prefix!")
	fmt.Println("These messages should be stored as raw bytes without schema validation.")
}