aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_server.go')
-rw-r--r--weed/mq/broker/broker_server.go57
1 files changed, 46 insertions, 11 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 714348798..429a76df1 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -32,12 +32,18 @@ type MessageQueueBrokerOption struct {
Port int
Cipher bool
VolumeServerAccess string // how to access volume servers
+ LogFlushInterval int // log buffer flush interval in seconds
}
func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress {
return pb.NewServerAddress(option.Ip, option.Port, 0)
}
+type topicExistsCacheEntry struct {
+ exists bool
+ expiresAt time.Time
+}
+
type MessageQueueBroker struct {
mq_pb.UnimplementedSeaweedMessagingServer
option *MessageQueueBrokerOption
@@ -48,9 +54,18 @@ type MessageQueueBroker struct {
localTopicManager *topic.LocalTopicManager
PubBalancer *pub_balancer.PubBalancer
lockAsBalancer *cluster.LiveLock
- SubCoordinator *sub_coordinator.SubCoordinator
- accessLock sync.Mutex
- fca *filer_client.FilerClientAccessor
+ // TODO: Add native offset management to broker
+ // ASSUMPTION: BrokerOffsetManager handles all partition offset assignment
+ offsetManager *BrokerOffsetManager
+ SubCoordinator *sub_coordinator.SubCoordinator
+ // Removed gatewayRegistry - no longer needed
+ accessLock sync.Mutex
+ fca *filer_client.FilerClientAccessor
+ // TopicExists cache to reduce filer lookups
+ // Caches both positive (topic exists) and negative (topic doesn't exist) results
+ topicExistsCache map[string]*topicExistsCacheEntry
+ topicExistsCacheMu sync.RWMutex
+ topicExistsCacheTTL time.Duration
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -59,17 +74,27 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
subCoordinator := sub_coordinator.NewSubCoordinator()
mqBroker = &MessageQueueBroker{
- option: option,
- grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
- filers: make(map[pb.ServerAddress]struct{}),
- localTopicManager: topic.NewLocalTopicManager(),
- PubBalancer: pubBalancer,
- SubCoordinator: subCoordinator,
+ option: option,
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
+ filers: make(map[pb.ServerAddress]struct{}),
+ localTopicManager: topic.NewLocalTopicManager(),
+ PubBalancer: pubBalancer,
+ SubCoordinator: subCoordinator,
+ offsetManager: nil, // Will be initialized below
+ topicExistsCache: make(map[string]*topicExistsCacheEntry),
+ topicExistsCacheTTL: 30 * time.Second, // Cache for 30 seconds to reduce filer load
}
+ // Create FilerClientAccessor that adapts broker's single filer to the new multi-filer interface
fca := &filer_client.FilerClientAccessor{
- GetFiler: mqBroker.GetFiler,
GetGrpcDialOption: mqBroker.GetGrpcDialOption,
+ GetFilers: func() []pb.ServerAddress {
+ filer := mqBroker.GetFiler()
+ if filer != "" {
+ return []pb.ServerAddress{filer}
+ }
+ return []pb.ServerAddress{}
+ },
}
mqBroker.fca = fca
subCoordinator.FilerClientAccessor = fca
@@ -79,6 +104,12 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
go mqBroker.MasterClient.KeepConnectedToMaster(context.Background())
+ // Initialize offset manager using the filer accessor
+ // The filer accessor will automatically use the current filer address as it gets discovered
+ // No hardcoded namespace/topic - offset storage now derives paths from actual topic information
+ mqBroker.offsetManager = NewBrokerOffsetManagerWithFilerAccessor(fca)
+ glog.V(0).Infof("broker initialized offset manager with filer accessor (current filer: %s)", mqBroker.GetFiler())
+
existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(context.Background()), grpcDialOption, option.FilerGroup, cluster.FilerType)
for _, newNode := range existingNodes {
mqBroker.OnBrokerUpdate(newNode, time.Now())
@@ -114,12 +145,16 @@ func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate,
b.filers[address] = struct{}{}
if b.currentFiler == "" {
b.currentFiler = address
+ // The offset manager will automatically use the updated filer through the filer accessor
+ glog.V(0).Infof("broker discovered filer %s (offset manager will automatically use it via filer accessor)", address)
}
} else {
delete(b.filers, address)
if b.currentFiler == address {
for filer := range b.filers {
b.currentFiler = filer
+ // The offset manager will automatically use the new filer through the filer accessor
+ glog.V(0).Infof("broker switched to filer %s (offset manager will automatically use it)", filer)
break
}
}