aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/balancer/balancer.go21
-rw-r--r--weed/mq/broker/broker_grpc_balancer.go23
-rw-r--r--weed/mq/broker/broker_server.go22
-rw-r--r--weed/mq/broker/broker_stats.go74
-rw-r--r--weed/mq/topic/local_manager.go22
-rw-r--r--weed/mq/topic/local_partition.go1
6 files changed, 137 insertions, 26 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 4c5f8f3c8..74871925f 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -1,7 +1,6 @@
package balancer
import (
- "fmt"
cmap "github.com/orcaman/concurrent-map/v2"
)
@@ -10,24 +9,10 @@ type Balancer struct {
}
type BrokerStats struct {
TopicPartitionCount int32
- MessageCount int64
- BytesCount int64
+ ConsumerCount int32
CpuUsagePercent int32
}
-type TopicPartition struct {
- Topic string
- RangeStart int32
- RangeStop int32
-}
-
-type TopicPartitionStats struct {
- TopicPartition
- Throughput int64
- ConsumerCount int64
- TopicPartitionCount int64
-}
-
func NewBalancer() *Balancer {
return &Balancer{
Brokers: cmap.New[*BrokerStats](),
@@ -37,7 +22,3 @@ func NewBalancer() *Balancer {
func NewBrokerStats() *BrokerStats {
return &BrokerStats{}
}
-
-func (tp *TopicPartition) String() string {
- return fmt.Sprintf("%v-%04d-%04d", tp.Topic, tp.RangeStart, tp.RangeStop)
-}
diff --git a/weed/mq/broker/broker_grpc_balancer.go b/weed/mq/broker/broker_grpc_balancer.go
index cdc1f2ace..c4a0357a2 100644
--- a/weed/mq/broker/broker_grpc_balancer.go
+++ b/weed/mq/broker/broker_grpc_balancer.go
@@ -1,39 +1,50 @@
package broker
import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
+// BrokerConnectToBalancer receives connections from brokers and collects stats
func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error {
+ if !broker.lockAsBalancer.IsLocked() {
+ return status.Errorf(codes.Unavailable, "not current broker balancer")
+ }
req, err := stream.Recv()
if err != nil {
return err
}
- response := &mq_pb.ConnectToBalancerResponse{}
+
+ // process init message
initMessage := req.GetInit()
brokerStats := balancer.NewBrokerStats()
if initMessage != nil {
broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats)
} else {
- response.Error = "balancer init message is empty"
- return stream.Send(response)
+ return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
}
defer func() {
broker.Balancer.Brokers.Remove(initMessage.Broker)
}()
- stream.Send(response)
+ // process stats message
for {
req, err := stream.Recv()
if err != nil {
return err
}
+ if !broker.lockAsBalancer.IsLocked() {
+ return status.Errorf(codes.Unavailable, "not current broker balancer")
+ }
if receivedStats := req.GetStats(); receivedStats != nil {
brokerStats.TopicPartitionCount = receivedStats.TopicPartitionCount
- brokerStats.MessageCount = receivedStats.MessageCount
- brokerStats.BytesCount = receivedStats.BytesCount
+ brokerStats.ConsumerCount = receivedStats.ConsumerCount
brokerStats.CpuUsagePercent = receivedStats.CpuUsagePercent
+
+ glog.V(3).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats)
}
}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 90c6179cb..db8329989 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,6 +1,8 @@
package broker
import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"time"
@@ -36,6 +38,7 @@ type MessageQueueBroker struct {
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
Balancer *balancer.Balancer
+ lockAsBalancer *cluster.LiveLock
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -57,6 +60,25 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker.OnBrokerUpdate(newNode, time.Now())
}
+ // keep connecting to balancer
+ go func() {
+ for mqBroker.currentFiler == "" {
+ time.Sleep(time.Millisecond * 237)
+ }
+ self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
+ glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
+
+ lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
+ mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self)
+ for {
+ err := mqBroker.BrokerConnectToBalancer(self)
+ if err != nil {
+ fmt.Printf("BrokerConnectToBalancer: %v\n", err)
+ }
+ time.Sleep(time.Second)
+ }
+ }()
+
return mqBroker, nil
}
diff --git a/weed/mq/broker/broker_stats.go b/weed/mq/broker/broker_stats.go
new file mode 100644
index 000000000..20dd7039e
--- /dev/null
+++ b/weed/mq/broker/broker_stats.go
@@ -0,0 +1,74 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "math/rand"
+ "time"
+)
+
+const (
+ LockBrokerBalancer = "broker_balancer"
+)
+
+// BrokerConnectToBalancer connects to the broker balancer and sends stats
+func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
+ // find the lock owner
+ var brokerBalancer string
+ err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
+ Name: LockBrokerBalancer,
+ })
+ if err != nil {
+ return err
+ }
+ brokerBalancer = resp.Owner
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+
+ glog.V(1).Infof("broker %s found balancer %s", self, brokerBalancer)
+
+ // connect to the lock owner
+ err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ stream, err := client.ConnectToBalancer(context.Background())
+ if err != nil {
+ return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
+ }
+ defer stream.CloseSend()
+ err = stream.Send(&mq_pb.ConnectToBalancerRequest{
+ Message: &mq_pb.ConnectToBalancerRequest_Init{
+ Init: &mq_pb.ConnectToBalancerRequest_InitMessage{
+ Broker: self,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("send init message: %v", err)
+ }
+
+ for {
+ stats := broker.localTopicManager.CollectStats(time.Second * 5)
+ err = stream.Send(&mq_pb.ConnectToBalancerRequest{
+ Message: &mq_pb.ConnectToBalancerRequest_Stats{
+ Stats: stats,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("send stats message: %v", err)
+ }
+
+ time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
+ }
+
+ return nil
+ })
+
+ return err
+}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 6e7db5d08..d0d9def19 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -2,6 +2,9 @@ package topic
import (
cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/shirou/gopsutil/v3/cpu"
+ "time"
)
// LocalTopicManager manages topics on local broker
@@ -53,3 +56,22 @@ func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Pa
}
return localTopic.removePartition(partition)
}
+
+func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
+ stats := &mq_pb.BrokerStats{}
+ manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
+ for _, localPartition := range localTopic.Partitions {
+ stats.TopicPartitionCount++
+ stats.ConsumerCount += localPartition.ConsumerCount
+ }
+ })
+
+ // collect current broker's cpu usage
+ usages, err := cpu.Percent(duration, false)
+ if err == nil && len(usages) > 0 {
+ stats.CpuUsagePercent = int32(usages[0])
+ }
+
+ return stats
+
+}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index eaedb9f20..49b639dfa 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -14,6 +14,7 @@ type LocalPartition struct {
isLeader bool
FollowerBrokers []pb.ServerAddress
logBuffer *log_buffer.LogBuffer
+ ConsumerCount int32
}
func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {