diff options
Diffstat (limited to 'weed/mq/broker/broker_server.go')
| -rw-r--r-- | weed/mq/broker/broker_server.go | 57 |
1 files changed, 46 insertions, 11 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 714348798..429a76df1 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -32,12 +32,18 @@ type MessageQueueBrokerOption struct { Port int Cipher bool VolumeServerAccess string // how to access volume servers + LogFlushInterval int // log buffer flush interval in seconds } func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress { return pb.NewServerAddress(option.Ip, option.Port, 0) } +type topicExistsCacheEntry struct { + exists bool + expiresAt time.Time +} + type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer option *MessageQueueBrokerOption @@ -48,9 +54,18 @@ type MessageQueueBroker struct { localTopicManager *topic.LocalTopicManager PubBalancer *pub_balancer.PubBalancer lockAsBalancer *cluster.LiveLock - SubCoordinator *sub_coordinator.SubCoordinator - accessLock sync.Mutex - fca *filer_client.FilerClientAccessor + // TODO: Add native offset management to broker + // ASSUMPTION: BrokerOffsetManager handles all partition offset assignment + offsetManager *BrokerOffsetManager + SubCoordinator *sub_coordinator.SubCoordinator + // Removed gatewayRegistry - no longer needed + accessLock sync.Mutex + fca *filer_client.FilerClientAccessor + // TopicExists cache to reduce filer lookups + // Caches both positive (topic exists) and negative (topic doesn't exist) results + topicExistsCache map[string]*topicExistsCacheEntry + topicExistsCacheMu sync.RWMutex + topicExistsCacheTTL time.Duration } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -59,17 +74,27 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial subCoordinator := sub_coordinator.NewSubCoordinator() mqBroker = &MessageQueueBroker{ - option: option, - grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), - filers: make(map[pb.ServerAddress]struct{}), - localTopicManager: topic.NewLocalTopicManager(), - PubBalancer: pubBalancer, - SubCoordinator: subCoordinator, + option: option, + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, option.BrokerAddress(), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), + filers: make(map[pb.ServerAddress]struct{}), + localTopicManager: topic.NewLocalTopicManager(), + PubBalancer: pubBalancer, + SubCoordinator: subCoordinator, + offsetManager: nil, // Will be initialized below + topicExistsCache: make(map[string]*topicExistsCacheEntry), + topicExistsCacheTTL: 30 * time.Second, // Cache for 30 seconds to reduce filer load } + // Create FilerClientAccessor that adapts broker's single filer to the new multi-filer interface fca := &filer_client.FilerClientAccessor{ - GetFiler: mqBroker.GetFiler, GetGrpcDialOption: mqBroker.GetGrpcDialOption, + GetFilers: func() []pb.ServerAddress { + filer := mqBroker.GetFiler() + if filer != "" { + return []pb.ServerAddress{filer} + } + return []pb.ServerAddress{} + }, } mqBroker.fca = fca subCoordinator.FilerClientAccessor = fca @@ -79,6 +104,12 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial go mqBroker.MasterClient.KeepConnectedToMaster(context.Background()) + // Initialize offset manager using the filer accessor + // The filer accessor will automatically use the current filer address as it gets discovered + // No hardcoded namespace/topic - offset storage now derives paths from actual topic information + mqBroker.offsetManager = NewBrokerOffsetManagerWithFilerAccessor(fca) + glog.V(0).Infof("broker initialized offset manager with filer accessor (current filer: %s)", mqBroker.GetFiler()) + existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(context.Background()), grpcDialOption, option.FilerGroup, cluster.FilerType) for _, newNode := range existingNodes { mqBroker.OnBrokerUpdate(newNode, time.Now()) @@ -114,12 +145,16 @@ func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, b.filers[address] = struct{}{} if b.currentFiler == "" { b.currentFiler = address + // The offset manager will automatically use the updated filer through the filer accessor + glog.V(0).Infof("broker discovered filer %s (offset manager will automatically use it via filer accessor)", address) } } else { delete(b.filers, address) if b.currentFiler == address { for filer := range b.filers { b.currentFiler = filer + // The offset manager will automatically use the new filer through the filer accessor + glog.V(0).Infof("broker switched to filer %s (offset manager will automatically use it)", filer) break } } |
