1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
package integration
import (
"context"
"fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration
func NewSeaweedMQBrokerHandler(masters string, filerGroup string, clientHost string) (*SeaweedMQHandler, error) {
if masters == "" {
return nil, fmt.Errorf("masters required - SeaweedMQ infrastructure must be configured")
}
// Parse master addresses using SeaweedFS utilities
masterServerAddresses := pb.ServerAddresses(masters).ToAddresses()
if len(masterServerAddresses) == 0 {
return nil, fmt.Errorf("no valid master addresses provided")
}
// Load security configuration for gRPC connections
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.mq")
masterDiscovery := pb.ServerAddresses(masters).ToServiceDiscovery()
// Use provided client host for proper gRPC connection
// This is critical for MasterClient to establish streaming connections
clientHostAddr := pb.ServerAddress(clientHost)
masterClient := wdclient.NewMasterClient(grpcDialOption, filerGroup, "kafka-gateway", clientHostAddr, "", "", *masterDiscovery)
glog.V(1).Infof("Created MasterClient with clientHost=%s, masters=%s", clientHost, masters)
// Start KeepConnectedToMaster in background to maintain connection
glog.V(1).Infof("Starting KeepConnectedToMaster background goroutine...")
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
masterClient.KeepConnectedToMaster(ctx)
}()
// Give the connection a moment to establish
time.Sleep(2 * time.Second)
glog.V(1).Infof("Initial connection delay completed")
// Discover brokers from masters using master client
glog.V(1).Infof("About to call discoverBrokersWithMasterClient...")
brokerAddresses, err := discoverBrokersWithMasterClient(masterClient, filerGroup)
if err != nil {
glog.Errorf("Broker discovery failed: %v", err)
return nil, fmt.Errorf("failed to discover brokers: %v", err)
}
glog.V(1).Infof("Broker discovery returned: %v", brokerAddresses)
if len(brokerAddresses) == 0 {
return nil, fmt.Errorf("no brokers discovered from masters")
}
// Discover filers from masters using master client
filerAddresses, err := discoverFilersWithMasterClient(masterClient, filerGroup)
if err != nil {
return nil, fmt.Errorf("failed to discover filers: %v", err)
}
// Create shared filer client accessor for all components
sharedFilerAccessor := filer_client.NewFilerClientAccessor(
filerAddresses,
grpcDialOption,
)
// For now, use the first broker (can be enhanced later for load balancing)
brokerAddress := brokerAddresses[0]
// Create broker client with shared filer accessor
brokerClient, err := NewBrokerClientWithFilerAccessor(brokerAddress, sharedFilerAccessor)
if err != nil {
return nil, fmt.Errorf("failed to create broker client: %v", err)
}
// Test the connection
if err := brokerClient.HealthCheck(); err != nil {
brokerClient.Close()
return nil, fmt.Errorf("broker health check failed: %v", err)
}
return &SeaweedMQHandler{
filerClientAccessor: sharedFilerAccessor,
brokerClient: brokerClient,
masterClient: masterClient,
// topics map removed - always read from filer directly
// ledgers removed - SMQ broker handles all offset management
brokerAddresses: brokerAddresses, // Store all discovered broker addresses
hwmCache: make(map[string]*hwmCacheEntry),
hwmCacheTTL: 100 * time.Millisecond, // 100ms cache TTL for fresh HWM reads (critical for Schema Registry)
topicExistsCache: make(map[string]*topicExistsCacheEntry),
topicExistsCacheTTL: 5 * time.Second, // 5 second cache TTL for topic existence
}, nil
}
// discoverBrokersWithMasterClient queries masters for available brokers using reusable master client
func discoverBrokersWithMasterClient(masterClient *wdclient.MasterClient, filerGroup string) ([]string, error) {
var brokers []string
err := masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
glog.V(1).Infof("Inside MasterClient.WithClient callback - client obtained successfully")
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.BrokerType,
FilerGroup: filerGroup,
Limit: 1000,
})
if err != nil {
return err
}
glog.V(1).Infof("list cluster nodes successful - found %d cluster nodes", len(resp.ClusterNodes))
// Extract broker addresses from response
for _, node := range resp.ClusterNodes {
if node.Address != "" {
brokers = append(brokers, node.Address)
glog.V(1).Infof("discovered broker: %s", node.Address)
}
}
return nil
})
if err != nil {
glog.Errorf("MasterClient.WithClient failed: %v", err)
} else {
glog.V(1).Infof("Broker discovery completed successfully - found %d brokers: %v", len(brokers), brokers)
}
return brokers, err
}
// discoverFilersWithMasterClient queries masters for available filers using reusable master client
func discoverFilersWithMasterClient(masterClient *wdclient.MasterClient, filerGroup string) ([]pb.ServerAddress, error) {
var filers []pb.ServerAddress
err := masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
FilerGroup: filerGroup,
Limit: 1000,
})
if err != nil {
return err
}
// Extract filer addresses from response - return as HTTP addresses (pb.ServerAddress)
for _, node := range resp.ClusterNodes {
if node.Address != "" {
// Return HTTP address as pb.ServerAddress (no pre-conversion to gRPC)
httpAddr := pb.ServerAddress(node.Address)
filers = append(filers, httpAddr)
}
}
return nil
})
return filers, err
}
// GetFilerClientAccessor returns the shared filer client accessor
func (h *SeaweedMQHandler) GetFilerClientAccessor() *filer_client.FilerClientAccessor {
return h.filerClientAccessor
}
// SetProtocolHandler sets the protocol handler reference for accessing connection context
func (h *SeaweedMQHandler) SetProtocolHandler(handler ProtocolHandler) {
h.protocolHandler = handler
}
// GetBrokerAddresses returns the discovered SMQ broker addresses
func (h *SeaweedMQHandler) GetBrokerAddresses() []string {
return h.brokerAddresses
}
// Close shuts down the handler and all connections
func (h *SeaweedMQHandler) Close() error {
if h.brokerClient != nil {
return h.brokerClient.Close()
}
return nil
}
// CreatePerConnectionBrokerClient creates a new BrokerClient instance for a specific connection
// CRITICAL: Each Kafka TCP connection gets its own BrokerClient to prevent gRPC stream interference
// This fixes the deadlock where CreateFreshSubscriber would block all connections
func (h *SeaweedMQHandler) CreatePerConnectionBrokerClient() (*BrokerClient, error) {
// Use the same broker addresses as the shared client
if len(h.brokerAddresses) == 0 {
return nil, fmt.Errorf("no broker addresses available")
}
// Use the first broker address (in production, could use load balancing)
brokerAddress := h.brokerAddresses[0]
// Create a new client with the shared filer accessor
client, err := NewBrokerClientWithFilerAccessor(brokerAddress, h.filerClientAccessor)
if err != nil {
return nil, fmt.Errorf("failed to create broker client: %w", err)
}
return client, nil
}
|