aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/types.go
blob: d707045e682a00f3c28312229794be87f7ad3dda (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package integration

import (
	"context"
	"fmt"
	"sync"
	"time"

	"google.golang.org/grpc"

	"github.com/seaweedfs/seaweedfs/weed/filer_client"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
	"github.com/seaweedfs/seaweedfs/weed/wdclient"
)

// SMQRecord interface for records from SeaweedMQ
type SMQRecord interface {
	GetKey() []byte
	GetValue() []byte
	GetTimestamp() int64
	GetOffset() int64
}

// hwmCacheEntry represents a cached high water mark value
type hwmCacheEntry struct {
	value     int64
	expiresAt time.Time
}

// topicExistsCacheEntry represents a cached topic existence check
type topicExistsCacheEntry struct {
	exists    bool
	expiresAt time.Time
}

// SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage
type SeaweedMQHandler struct {
	// Shared filer client accessor for all components
	filerClientAccessor *filer_client.FilerClientAccessor

	brokerClient *BrokerClient // For broker-based connections

	// Master client for service discovery
	masterClient *wdclient.MasterClient

	// Discovered broker addresses (for Metadata responses)
	brokerAddresses []string

	// Reference to protocol handler for accessing connection context
	protocolHandler ProtocolHandler

	// High water mark cache to reduce broker queries
	hwmCache    map[string]*hwmCacheEntry // key: "topic:partition"
	hwmCacheMu  sync.RWMutex
	hwmCacheTTL time.Duration

	// Topic existence cache to reduce broker queries
	topicExistsCache    map[string]*topicExistsCacheEntry // key: "topic"
	topicExistsCacheMu  sync.RWMutex
	topicExistsCacheTTL time.Duration
}

// ConnectionContext holds connection-specific information for requests
// This is a local copy to avoid circular dependency with protocol package
type ConnectionContext struct {
	ClientID      string      // Kafka client ID from request headers
	ConsumerGroup string      // Consumer group (set by JoinGroup)
	MemberID      string      // Consumer group member ID (set by JoinGroup)
	BrokerClient  interface{} // Per-connection broker client (*BrokerClient)
}

// ProtocolHandler interface for accessing Handler's connection context
type ProtocolHandler interface {
	GetConnectionContext() *ConnectionContext
}

// KafkaTopicInfo holds Kafka-specific topic information
type KafkaTopicInfo struct {
	Name       string
	Partitions int32
	CreatedAt  int64

	// SeaweedMQ integration
	SeaweedTopic *schema_pb.Topic
}

// TopicPartitionKey uniquely identifies a topic partition
type TopicPartitionKey struct {
	Topic     string
	Partition int32
}

// SeaweedRecord represents a record received from SeaweedMQ
type SeaweedRecord struct {
	Key       []byte
	Value     []byte
	Timestamp int64
	Offset    int64
}

// PartitionRangeInfo contains comprehensive range information for a partition
type PartitionRangeInfo struct {
	// Offset range information
	EarliestOffset int64
	LatestOffset   int64
	HighWaterMark  int64

	// Timestamp range information
	EarliestTimestampNs int64
	LatestTimestampNs   int64

	// Partition metadata
	RecordCount         int64
	ActiveSubscriptions int64
}

// SeaweedSMQRecord implements the SMQRecord interface for SeaweedMQ records
type SeaweedSMQRecord struct {
	key       []byte
	value     []byte
	timestamp int64
	offset    int64
}

// GetKey returns the record key
func (r *SeaweedSMQRecord) GetKey() []byte {
	return r.key
}

// GetValue returns the record value
func (r *SeaweedSMQRecord) GetValue() []byte {
	return r.value
}

// GetTimestamp returns the record timestamp
func (r *SeaweedSMQRecord) GetTimestamp() int64 {
	return r.timestamp
}

// GetOffset returns the Kafka offset for this record
func (r *SeaweedSMQRecord) GetOffset() int64 {
	return r.offset
}

// BrokerClient wraps the SeaweedMQ Broker gRPC client for Kafka gateway integration
// FetchRequest tracks an in-flight fetch request with multiple waiters
type FetchRequest struct {
	topic      string
	partition  int32
	offset     int64
	resultChan chan FetchResult   // Single channel for the fetch result
	waiters    []chan FetchResult // Multiple waiters can subscribe
	mu         sync.Mutex
	inProgress bool
}

// FetchResult contains the result of a fetch operation
type FetchResult struct {
	records []*SeaweedRecord
	err     error
}

// partitionAssignmentCacheEntry caches LookupTopicBrokers results
type partitionAssignmentCacheEntry struct {
	assignments []*mq_pb.BrokerPartitionAssignment
	expiresAt   time.Time
}

type BrokerClient struct {
	// Reference to shared filer client accessor
	filerClientAccessor *filer_client.FilerClientAccessor

	brokerAddress string
	conn          *grpc.ClientConn
	client        mq_pb.SeaweedMessagingClient

	// Publisher streams: topic-partition -> stream info
	publishersLock sync.RWMutex
	publishers     map[string]*BrokerPublisherSession

	// Publisher creation locks to prevent concurrent creation attempts for the same topic-partition
	publisherCreationLocks map[string]*sync.Mutex

	// Subscriber streams for offset tracking
	subscribersLock sync.RWMutex
	subscribers     map[string]*BrokerSubscriberSession

	// Request deduplication for stateless fetches
	fetchRequestsLock sync.Mutex
	fetchRequests     map[string]*FetchRequest

	// Partition assignment cache to reduce LookupTopicBrokers calls (13.5% CPU overhead!)
	partitionAssignmentCache    map[string]*partitionAssignmentCacheEntry // Key: topic name
	partitionAssignmentCacheMu  sync.RWMutex
	partitionAssignmentCacheTTL time.Duration

	ctx    context.Context
	cancel context.CancelFunc
}

// BrokerPublisherSession tracks a publishing stream to SeaweedMQ broker
type BrokerPublisherSession struct {
	Topic     string
	Partition int32
	Stream    mq_pb.SeaweedMessaging_PublishMessageClient
	mu        sync.Mutex // Protects Send/Recv pairs from concurrent access
}

// BrokerSubscriberSession tracks a subscription stream for offset management
type BrokerSubscriberSession struct {
	Topic     string
	Partition int32
	Stream    mq_pb.SeaweedMessaging_SubscribeMessageClient
	// Track the requested start offset used to initialize this stream
	StartOffset int64
	// Consumer group identity for this session
	ConsumerGroup string
	ConsumerID    string
	// Context for canceling reads (used for timeout)
	Ctx    context.Context
	Cancel context.CancelFunc
	// Mutex to serialize all operations on this session
	mu sync.Mutex
	// Cache of consumed records to avoid re-reading from broker
	consumedRecords  []*SeaweedRecord
	nextOffsetToRead int64
	// Track what has actually been READ from the stream (not what was requested)
	// This is the HIGHEST offset that has been read from the stream
	// Used to determine if we need to seek or can continue reading
	lastReadOffset int64
	// Flag to indicate if this session has been initialized
	initialized bool
}

// Key generates a unique key for this subscriber session
// Includes consumer group and ID to prevent different consumers from sharing sessions
func (s *BrokerSubscriberSession) Key() string {
	return fmt.Sprintf("%s-%d-%s-%s", s.Topic, s.Partition, s.ConsumerGroup, s.ConsumerID)
}