blob: d707045e682a00f3c28312229794be87f7ad3dda (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
|
package integration
import (
"context"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
// SMQRecord interface for records from SeaweedMQ
type SMQRecord interface {
GetKey() []byte
GetValue() []byte
GetTimestamp() int64
GetOffset() int64
}
// hwmCacheEntry represents a cached high water mark value
type hwmCacheEntry struct {
value int64
expiresAt time.Time
}
// topicExistsCacheEntry represents a cached topic existence check
type topicExistsCacheEntry struct {
exists bool
expiresAt time.Time
}
// SeaweedMQHandler integrates Kafka protocol handlers with real SeaweedMQ storage
type SeaweedMQHandler struct {
// Shared filer client accessor for all components
filerClientAccessor *filer_client.FilerClientAccessor
brokerClient *BrokerClient // For broker-based connections
// Master client for service discovery
masterClient *wdclient.MasterClient
// Discovered broker addresses (for Metadata responses)
brokerAddresses []string
// Reference to protocol handler for accessing connection context
protocolHandler ProtocolHandler
// High water mark cache to reduce broker queries
hwmCache map[string]*hwmCacheEntry // key: "topic:partition"
hwmCacheMu sync.RWMutex
hwmCacheTTL time.Duration
// Topic existence cache to reduce broker queries
topicExistsCache map[string]*topicExistsCacheEntry // key: "topic"
topicExistsCacheMu sync.RWMutex
topicExistsCacheTTL time.Duration
}
// ConnectionContext holds connection-specific information for requests
// This is a local copy to avoid circular dependency with protocol package
type ConnectionContext struct {
ClientID string // Kafka client ID from request headers
ConsumerGroup string // Consumer group (set by JoinGroup)
MemberID string // Consumer group member ID (set by JoinGroup)
BrokerClient interface{} // Per-connection broker client (*BrokerClient)
}
// ProtocolHandler interface for accessing Handler's connection context
type ProtocolHandler interface {
GetConnectionContext() *ConnectionContext
}
// KafkaTopicInfo holds Kafka-specific topic information
type KafkaTopicInfo struct {
Name string
Partitions int32
CreatedAt int64
// SeaweedMQ integration
SeaweedTopic *schema_pb.Topic
}
// TopicPartitionKey uniquely identifies a topic partition
type TopicPartitionKey struct {
Topic string
Partition int32
}
// SeaweedRecord represents a record received from SeaweedMQ
type SeaweedRecord struct {
Key []byte
Value []byte
Timestamp int64
Offset int64
}
// PartitionRangeInfo contains comprehensive range information for a partition
type PartitionRangeInfo struct {
// Offset range information
EarliestOffset int64
LatestOffset int64
HighWaterMark int64
// Timestamp range information
EarliestTimestampNs int64
LatestTimestampNs int64
// Partition metadata
RecordCount int64
ActiveSubscriptions int64
}
// SeaweedSMQRecord implements the SMQRecord interface for SeaweedMQ records
type SeaweedSMQRecord struct {
key []byte
value []byte
timestamp int64
offset int64
}
// GetKey returns the record key
func (r *SeaweedSMQRecord) GetKey() []byte {
return r.key
}
// GetValue returns the record value
func (r *SeaweedSMQRecord) GetValue() []byte {
return r.value
}
// GetTimestamp returns the record timestamp
func (r *SeaweedSMQRecord) GetTimestamp() int64 {
return r.timestamp
}
// GetOffset returns the Kafka offset for this record
func (r *SeaweedSMQRecord) GetOffset() int64 {
return r.offset
}
// BrokerClient wraps the SeaweedMQ Broker gRPC client for Kafka gateway integration
// FetchRequest tracks an in-flight fetch request with multiple waiters
type FetchRequest struct {
topic string
partition int32
offset int64
resultChan chan FetchResult // Single channel for the fetch result
waiters []chan FetchResult // Multiple waiters can subscribe
mu sync.Mutex
inProgress bool
}
// FetchResult contains the result of a fetch operation
type FetchResult struct {
records []*SeaweedRecord
err error
}
// partitionAssignmentCacheEntry caches LookupTopicBrokers results
type partitionAssignmentCacheEntry struct {
assignments []*mq_pb.BrokerPartitionAssignment
expiresAt time.Time
}
type BrokerClient struct {
// Reference to shared filer client accessor
filerClientAccessor *filer_client.FilerClientAccessor
brokerAddress string
conn *grpc.ClientConn
client mq_pb.SeaweedMessagingClient
// Publisher streams: topic-partition -> stream info
publishersLock sync.RWMutex
publishers map[string]*BrokerPublisherSession
// Publisher creation locks to prevent concurrent creation attempts for the same topic-partition
publisherCreationLocks map[string]*sync.Mutex
// Subscriber streams for offset tracking
subscribersLock sync.RWMutex
subscribers map[string]*BrokerSubscriberSession
// Request deduplication for stateless fetches
fetchRequestsLock sync.Mutex
fetchRequests map[string]*FetchRequest
// Partition assignment cache to reduce LookupTopicBrokers calls (13.5% CPU overhead!)
partitionAssignmentCache map[string]*partitionAssignmentCacheEntry // Key: topic name
partitionAssignmentCacheMu sync.RWMutex
partitionAssignmentCacheTTL time.Duration
ctx context.Context
cancel context.CancelFunc
}
// BrokerPublisherSession tracks a publishing stream to SeaweedMQ broker
type BrokerPublisherSession struct {
Topic string
Partition int32
Stream mq_pb.SeaweedMessaging_PublishMessageClient
mu sync.Mutex // Protects Send/Recv pairs from concurrent access
}
// BrokerSubscriberSession tracks a subscription stream for offset management
type BrokerSubscriberSession struct {
Topic string
Partition int32
Stream mq_pb.SeaweedMessaging_SubscribeMessageClient
// Track the requested start offset used to initialize this stream
StartOffset int64
// Consumer group identity for this session
ConsumerGroup string
ConsumerID string
// Context for canceling reads (used for timeout)
Ctx context.Context
Cancel context.CancelFunc
// Mutex to serialize all operations on this session
mu sync.Mutex
// Cache of consumed records to avoid re-reading from broker
consumedRecords []*SeaweedRecord
nextOffsetToRead int64
// Track what has actually been READ from the stream (not what was requested)
// This is the HIGHEST offset that has been read from the stream
// Used to determine if we need to seek or can continue reading
lastReadOffset int64
// Flag to indicate if this session has been initialized
initialized bool
}
// Key generates a unique key for this subscriber session
// Includes consumer group and ID to prevent different consumers from sharing sessions
func (s *BrokerSubscriberSession) Key() string {
return fmt.Sprintf("%s-%d-%s-%s", s.Topic, s.Partition, s.ConsumerGroup, s.ConsumerID)
}
|