aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/broker_client.go
blob: 2bb632cccb8a1267d65fe6e6557f2f0620b75ff1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
package schema

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
	"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

// BrokerClient wraps pub_client.TopicPublisher to handle schematized messages
type BrokerClient struct {
	brokers       []string
	schemaManager *Manager

	// Publisher cache: topic -> publisher
	publishersLock sync.RWMutex
	publishers     map[string]*pub_client.TopicPublisher

	// Subscriber cache: topic -> subscriber
	subscribersLock sync.RWMutex
	subscribers     map[string]*sub_client.TopicSubscriber
}

// BrokerClientConfig holds configuration for the broker client
type BrokerClientConfig struct {
	Brokers       []string
	SchemaManager *Manager
}

// NewBrokerClient creates a new broker client for publishing schematized messages
func NewBrokerClient(config BrokerClientConfig) *BrokerClient {
	return &BrokerClient{
		brokers:       config.Brokers,
		schemaManager: config.SchemaManager,
		publishers:    make(map[string]*pub_client.TopicPublisher),
		subscribers:   make(map[string]*sub_client.TopicSubscriber),
	}
}

// PublishSchematizedMessage publishes a Confluent-framed message after decoding it
func (bc *BrokerClient) PublishSchematizedMessage(topicName string, key []byte, messageBytes []byte) error {
	// Step 1: Decode the schematized message
	decoded, err := bc.schemaManager.DecodeMessage(messageBytes)
	if err != nil {
		return fmt.Errorf("failed to decode schematized message: %w", err)
	}

	// Step 2: Get or create publisher for this topic
	publisher, err := bc.getOrCreatePublisher(topicName, decoded.RecordType)
	if err != nil {
		return fmt.Errorf("failed to get publisher for topic %s: %w", topicName, err)
	}

	// Step 3: Publish the decoded RecordValue to mq.broker
	return publisher.PublishRecord(key, decoded.RecordValue)
}

// PublishRawMessage publishes a raw message (non-schematized) to mq.broker
func (bc *BrokerClient) PublishRawMessage(topicName string, key []byte, value []byte) error {
	// For raw messages, create a simple publisher without RecordType
	publisher, err := bc.getOrCreatePublisher(topicName, nil)
	if err != nil {
		return fmt.Errorf("failed to get publisher for topic %s: %w", topicName, err)
	}

	return publisher.Publish(key, value)
}

// getOrCreatePublisher gets or creates a TopicPublisher for the given topic
func (bc *BrokerClient) getOrCreatePublisher(topicName string, recordType *schema_pb.RecordType) (*pub_client.TopicPublisher, error) {
	// Create cache key that includes record type info
	cacheKey := topicName
	if recordType != nil {
		cacheKey = fmt.Sprintf("%s:schematized", topicName)
	}

	// Try to get existing publisher
	bc.publishersLock.RLock()
	if publisher, exists := bc.publishers[cacheKey]; exists {
		bc.publishersLock.RUnlock()
		return publisher, nil
	}
	bc.publishersLock.RUnlock()

	// Create new publisher
	bc.publishersLock.Lock()
	defer bc.publishersLock.Unlock()

	// Double-check after acquiring write lock
	if publisher, exists := bc.publishers[cacheKey]; exists {
		return publisher, nil
	}

	// Create publisher configuration
	config := &pub_client.PublisherConfiguration{
		Topic:          topic.NewTopic("kafka", topicName), // Use "kafka" namespace
		PartitionCount: 1,                                  // Start with single partition
		Brokers:        bc.brokers,
		PublisherName:  "kafka-gateway-schema",
		RecordType:     recordType, // Set RecordType for schematized messages
	}

	// Create the publisher
	publisher, err := pub_client.NewTopicPublisher(config)
	if err != nil {
		return nil, fmt.Errorf("failed to create topic publisher: %w", err)
	}

	// Cache the publisher
	bc.publishers[cacheKey] = publisher

	return publisher, nil
}

// FetchSchematizedMessages fetches RecordValue messages from mq.broker and reconstructs Confluent envelopes
func (bc *BrokerClient) FetchSchematizedMessages(topicName string, maxMessages int) ([][]byte, error) {
	// Get or create subscriber for this topic
	subscriber, err := bc.getOrCreateSubscriber(topicName)
	if err != nil {
		return nil, fmt.Errorf("failed to get subscriber for topic %s: %w", topicName, err)
	}

	// Fetch RecordValue messages
	messages := make([][]byte, 0, maxMessages)
	for len(messages) < maxMessages {
		// Try to receive a message (non-blocking for now)
		recordValue, err := bc.receiveRecordValue(subscriber)
		if err != nil {
			break // No more messages available
		}

		// Reconstruct Confluent envelope from RecordValue
		envelope, err := bc.reconstructConfluentEnvelope(recordValue)
		if err != nil {
			continue
		}

		messages = append(messages, envelope)
	}

	return messages, nil
}

// getOrCreateSubscriber gets or creates a TopicSubscriber for the given topic
func (bc *BrokerClient) getOrCreateSubscriber(topicName string) (*sub_client.TopicSubscriber, error) {
	// Try to get existing subscriber
	bc.subscribersLock.RLock()
	if subscriber, exists := bc.subscribers[topicName]; exists {
		bc.subscribersLock.RUnlock()
		return subscriber, nil
	}
	bc.subscribersLock.RUnlock()

	// Create new subscriber
	bc.subscribersLock.Lock()
	defer bc.subscribersLock.Unlock()

	// Double-check after acquiring write lock
	if subscriber, exists := bc.subscribers[topicName]; exists {
		return subscriber, nil
	}

	// Create subscriber configuration
	subscriberConfig := &sub_client.SubscriberConfiguration{
		ClientId:                "kafka-gateway-schema",
		ConsumerGroup:           "kafka-gateway",
		ConsumerGroupInstanceId: fmt.Sprintf("kafka-gateway-%s", topicName),
		MaxPartitionCount:       1,
		SlidingWindowSize:       10,
	}

	// Create content configuration
	contentConfig := &sub_client.ContentConfiguration{
		Topic:      topic.NewTopic("kafka", topicName),
		Filter:     "",
		OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
	}

	// Create partition offset channel
	partitionOffsetChan := make(chan sub_client.KeyedTimestamp, 100)

	// Create the subscriber
	_ = sub_client.NewTopicSubscriber(
		context.Background(),
		bc.brokers,
		subscriberConfig,
		contentConfig,
		partitionOffsetChan,
	)

	// Try to initialize the subscriber connection
	// If it fails (e.g., with mock brokers), don't cache it
	// Use a context with timeout to avoid hanging on connection attempts
	subCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Test the connection by attempting to subscribe
	// This will fail with mock brokers that don't exist
	testSubscriber := sub_client.NewTopicSubscriber(
		subCtx,
		bc.brokers,
		subscriberConfig,
		contentConfig,
		partitionOffsetChan,
	)

	// Try to start the subscription - this should fail for mock brokers
	go func() {
		defer cancel()
		err := testSubscriber.Subscribe()
		if err != nil {
			// Expected to fail with mock brokers
			return
		}
	}()

	// Give it a brief moment to try connecting
	select {
	case <-time.After(100 * time.Millisecond):
		// Connection attempt timed out (expected with mock brokers)
		return nil, fmt.Errorf("failed to connect to brokers: connection timeout")
	case <-subCtx.Done():
		// Connection attempt failed (expected with mock brokers)
		return nil, fmt.Errorf("failed to connect to brokers: %w", subCtx.Err())
	}
}

// receiveRecordValue receives a single RecordValue from the subscriber
func (bc *BrokerClient) receiveRecordValue(subscriber *sub_client.TopicSubscriber) (*schema_pb.RecordValue, error) {
	// This is a simplified implementation - in a real system, this would
	// integrate with the subscriber's message receiving mechanism
	// For now, return an error to indicate no messages available
	return nil, fmt.Errorf("no messages available")
}

// reconstructConfluentEnvelope reconstructs a Confluent envelope from a RecordValue
func (bc *BrokerClient) reconstructConfluentEnvelope(recordValue *schema_pb.RecordValue) ([]byte, error) {
	// Extract schema information from the RecordValue metadata
	// This is a simplified implementation - in practice, we'd need to store
	// schema metadata alongside the RecordValue when publishing

	// For now, create a placeholder envelope
	// In a real implementation, we would:
	// 1. Extract the original schema ID from RecordValue metadata
	// 2. Get the schema format from the schema registry
	// 3. Encode the RecordValue back to the original format (Avro, JSON, etc.)
	// 4. Create the Confluent envelope with magic byte + schema ID + encoded data

	schemaID := uint32(1) // Placeholder - would be extracted from metadata
	format := FormatAvro  // Placeholder - would be determined from schema registry

	// Encode RecordValue back to original format
	encodedData, err := bc.schemaManager.EncodeMessage(recordValue, schemaID, format)
	if err != nil {
		return nil, fmt.Errorf("failed to encode RecordValue: %w", err)
	}

	return encodedData, nil
}

// Close shuts down all publishers and subscribers
func (bc *BrokerClient) Close() error {
	var lastErr error

	// Close publishers
	bc.publishersLock.Lock()
	for key, publisher := range bc.publishers {
		if err := publisher.FinishPublish(); err != nil {
			lastErr = fmt.Errorf("failed to finish publisher %s: %w", key, err)
		}
		if err := publisher.Shutdown(); err != nil {
			lastErr = fmt.Errorf("failed to shutdown publisher %s: %w", key, err)
		}
		delete(bc.publishers, key)
	}
	bc.publishersLock.Unlock()

	// Close subscribers
	bc.subscribersLock.Lock()
	for key, subscriber := range bc.subscribers {
		// TopicSubscriber doesn't have a Shutdown method in the current implementation
		// In a real implementation, we would properly close the subscriber
		_ = subscriber // Avoid unused variable warning
		delete(bc.subscribers, key)
	}
	bc.subscribersLock.Unlock()

	return lastErr
}

// GetPublisherStats returns statistics about active publishers and subscribers
func (bc *BrokerClient) GetPublisherStats() map[string]interface{} {
	bc.publishersLock.RLock()
	bc.subscribersLock.RLock()
	defer bc.publishersLock.RUnlock()
	defer bc.subscribersLock.RUnlock()

	stats := make(map[string]interface{})
	stats["active_publishers"] = len(bc.publishers)
	stats["active_subscribers"] = len(bc.subscribers)
	stats["brokers"] = bc.brokers

	publisherTopics := make([]string, 0, len(bc.publishers))
	for key := range bc.publishers {
		publisherTopics = append(publisherTopics, key)
	}
	stats["publisher_topics"] = publisherTopics

	subscriberTopics := make([]string, 0, len(bc.subscribers))
	for key := range bc.subscribers {
		subscriberTopics = append(subscriberTopics, key)
	}
	stats["subscriber_topics"] = subscriberTopics

	// Add "topics" key for backward compatibility with tests
	allTopics := make([]string, 0)
	topicSet := make(map[string]bool)
	for _, topic := range publisherTopics {
		if !topicSet[topic] {
			allTopics = append(allTopics, topic)
			topicSet[topic] = true
		}
	}
	for _, topic := range subscriberTopics {
		if !topicSet[topic] {
			allTopics = append(allTopics, topic)
			topicSet[topic] = true
		}
	}
	stats["topics"] = allTopics

	return stats
}

// IsSchematized checks if a message is Confluent-framed
func (bc *BrokerClient) IsSchematized(messageBytes []byte) bool {
	return bc.schemaManager.IsSchematized(messageBytes)
}

// ValidateMessage validates a schematized message without publishing
func (bc *BrokerClient) ValidateMessage(messageBytes []byte) (*DecodedMessage, error) {
	return bc.schemaManager.DecodeMessage(messageBytes)
}

// CreateRecordType creates a RecordType for a topic based on schema information
func (bc *BrokerClient) CreateRecordType(schemaID uint32, format Format) (*schema_pb.RecordType, error) {
	// Get schema from registry
	cachedSchema, err := bc.schemaManager.registryClient.GetSchemaByID(schemaID)
	if err != nil {
		return nil, fmt.Errorf("failed to get schema %d: %w", schemaID, err)
	}

	// Create appropriate decoder and infer RecordType
	switch format {
	case FormatAvro:
		decoder, err := bc.schemaManager.getAvroDecoder(schemaID, cachedSchema.Schema)
		if err != nil {
			return nil, fmt.Errorf("failed to create Avro decoder: %w", err)
		}
		return decoder.InferRecordType()

	case FormatJSONSchema:
		decoder, err := bc.schemaManager.getJSONSchemaDecoder(schemaID, cachedSchema.Schema)
		if err != nil {
			return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err)
		}
		return decoder.InferRecordType()

	case FormatProtobuf:
		decoder, err := bc.schemaManager.getProtobufDecoder(schemaID, cachedSchema.Schema)
		if err != nil {
			return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err)
		}
		return decoder.InferRecordType()

	default:
		return nil, fmt.Errorf("unsupported schema format: %v", format)
	}
}