diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/balancer/balancer.go | 21 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_balancer.go | 23 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 22 | ||||
| -rw-r--r-- | weed/mq/broker/broker_stats.go | 74 | ||||
| -rw-r--r-- | weed/mq/topic/local_manager.go | 22 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 1 |
6 files changed, 137 insertions, 26 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go index 4c5f8f3c8..74871925f 100644 --- a/weed/mq/balancer/balancer.go +++ b/weed/mq/balancer/balancer.go @@ -1,7 +1,6 @@ package balancer import ( - "fmt" cmap "github.com/orcaman/concurrent-map/v2" ) @@ -10,24 +9,10 @@ type Balancer struct { } type BrokerStats struct { TopicPartitionCount int32 - MessageCount int64 - BytesCount int64 + ConsumerCount int32 CpuUsagePercent int32 } -type TopicPartition struct { - Topic string - RangeStart int32 - RangeStop int32 -} - -type TopicPartitionStats struct { - TopicPartition - Throughput int64 - ConsumerCount int64 - TopicPartitionCount int64 -} - func NewBalancer() *Balancer { return &Balancer{ Brokers: cmap.New[*BrokerStats](), @@ -37,7 +22,3 @@ func NewBalancer() *Balancer { func NewBrokerStats() *BrokerStats { return &BrokerStats{} } - -func (tp *TopicPartition) String() string { - return fmt.Sprintf("%v-%04d-%04d", tp.Topic, tp.RangeStart, tp.RangeStop) -} diff --git a/weed/mq/broker/broker_grpc_balancer.go b/weed/mq/broker/broker_grpc_balancer.go index cdc1f2ace..c4a0357a2 100644 --- a/weed/mq/broker/broker_grpc_balancer.go +++ b/weed/mq/broker/broker_grpc_balancer.go @@ -1,39 +1,50 @@ package broker import ( + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +// BrokerConnectToBalancer receives connections from brokers and collects stats func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error { + if !broker.lockAsBalancer.IsLocked() { + return status.Errorf(codes.Unavailable, "not current broker balancer") + } req, err := stream.Recv() if err != nil { return err } - response := &mq_pb.ConnectToBalancerResponse{} + + // process init message initMessage := req.GetInit() brokerStats := balancer.NewBrokerStats() if initMessage != nil { broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats) } else { - response.Error = "balancer init message is empty" - return stream.Send(response) + return status.Errorf(codes.InvalidArgument, "balancer init message is empty") } defer func() { broker.Balancer.Brokers.Remove(initMessage.Broker) }() - stream.Send(response) + // process stats message for { req, err := stream.Recv() if err != nil { return err } + if !broker.lockAsBalancer.IsLocked() { + return status.Errorf(codes.Unavailable, "not current broker balancer") + } if receivedStats := req.GetStats(); receivedStats != nil { brokerStats.TopicPartitionCount = receivedStats.TopicPartitionCount - brokerStats.MessageCount = receivedStats.MessageCount - brokerStats.BytesCount = receivedStats.BytesCount + brokerStats.ConsumerCount = receivedStats.ConsumerCount brokerStats.CpuUsagePercent = receivedStats.CpuUsagePercent + + glog.V(3).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats) } } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 90c6179cb..db8329989 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,6 +1,8 @@ package broker import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "time" @@ -36,6 +38,7 @@ type MessageQueueBroker struct { currentFiler pb.ServerAddress localTopicManager *topic.LocalTopicManager Balancer *balancer.Balancer + lockAsBalancer *cluster.LiveLock } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -57,6 +60,25 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker.OnBrokerUpdate(newNode, time.Now()) } + // keep connecting to balancer + go func() { + for mqBroker.currentFiler == "" { + time.Sleep(time.Millisecond * 237) + } + self := fmt.Sprintf("%s:%d", option.Ip, option.Port) + glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler) + + lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) + mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self) + for { + err := mqBroker.BrokerConnectToBalancer(self) + if err != nil { + fmt.Printf("BrokerConnectToBalancer: %v\n", err) + } + time.Sleep(time.Second) + } + }() + return mqBroker, nil } diff --git a/weed/mq/broker/broker_stats.go b/weed/mq/broker/broker_stats.go new file mode 100644 index 000000000..20dd7039e --- /dev/null +++ b/weed/mq/broker/broker_stats.go @@ -0,0 +1,74 @@ +package broker + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "math/rand" + "time" +) + +const ( + LockBrokerBalancer = "broker_balancer" +) + +// BrokerConnectToBalancer connects to the broker balancer and sends stats +func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error { + // find the lock owner + var brokerBalancer string + err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{ + Name: LockBrokerBalancer, + }) + if err != nil { + return err + } + brokerBalancer = resp.Owner + return nil + }) + if err != nil { + return err + } + + glog.V(1).Infof("broker %s found balancer %s", self, brokerBalancer) + + // connect to the lock owner + err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + stream, err := client.ConnectToBalancer(context.Background()) + if err != nil { + return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err) + } + defer stream.CloseSend() + err = stream.Send(&mq_pb.ConnectToBalancerRequest{ + Message: &mq_pb.ConnectToBalancerRequest_Init{ + Init: &mq_pb.ConnectToBalancerRequest_InitMessage{ + Broker: self, + }, + }, + }) + if err != nil { + return fmt.Errorf("send init message: %v", err) + } + + for { + stats := broker.localTopicManager.CollectStats(time.Second * 5) + err = stream.Send(&mq_pb.ConnectToBalancerRequest{ + Message: &mq_pb.ConnectToBalancerRequest_Stats{ + Stats: stats, + }, + }) + if err != nil { + return fmt.Errorf("send stats message: %v", err) + } + + time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) + } + + return nil + }) + + return err +} diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 6e7db5d08..d0d9def19 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -2,6 +2,9 @@ package topic import ( cmap "github.com/orcaman/concurrent-map/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/shirou/gopsutil/v3/cpu" + "time" ) // LocalTopicManager manages topics on local broker @@ -53,3 +56,22 @@ func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Pa } return localTopic.removePartition(partition) } + +func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats { + stats := &mq_pb.BrokerStats{} + manager.topics.IterCb(func(topic string, localTopic *LocalTopic) { + for _, localPartition := range localTopic.Partitions { + stats.TopicPartitionCount++ + stats.ConsumerCount += localPartition.ConsumerCount + } + }) + + // collect current broker's cpu usage + usages, err := cpu.Percent(duration, false) + if err == nil && len(usages) > 0 { + stats.CpuUsagePercent = int32(usages[0]) + } + + return stats + +} diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index eaedb9f20..49b639dfa 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -14,6 +14,7 @@ type LocalPartition struct { isLeader bool FollowerBrokers []pb.ServerAddress logBuffer *log_buffer.LogBuffer + ConsumerCount int32 } func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition { |
