aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-24 23:34:31 -0700
committerchrislu <chris.lu@gmail.com>2023-09-24 23:34:31 -0700
commitc7e05e4e716e43541bdf8b3486166d8566ae8582 (patch)
tree3deade16b7cbf335af3f5b24f7f7d799956210fa
parentdff2ce5d2fa96693a0b4d55d314976f155fb902c (diff)
downloadseaweedfs-c7e05e4e716e43541bdf8b3486166d8566ae8582.tar.xz
seaweedfs-c7e05e4e716e43541bdf8b3486166d8566ae8582.zip
ensure latest stats are reported
-rw-r--r--weed/mq/broker/broker_grpc_balancer.go4
-rw-r--r--weed/mq/broker/broker_grpc_create.go6
-rw-r--r--weed/mq/broker/broker_stats.go2
-rw-r--r--weed/mq/topic/local_manager.go19
4 files changed, 21 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_balancer.go b/weed/mq/broker/broker_grpc_balancer.go
index e602f4bad..079a1612f 100644
--- a/weed/mq/broker/broker_grpc_balancer.go
+++ b/weed/mq/broker/broker_grpc_balancer.go
@@ -26,7 +26,9 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin
brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker)
if !found {
brokerStats = balancer.NewBrokerStats()
- broker.Balancer.Brokers.Set(initMessage.Broker, brokerStats)
+ if !broker.Balancer.Brokers.SetIfAbsent(initMessage.Broker, brokerStats) {
+ brokerStats, _ = broker.Balancer.Brokers.Get(initMessage.Broker)
+ }
}
} else {
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
diff --git a/weed/mq/broker/broker_grpc_create.go b/weed/mq/broker/broker_grpc_create.go
index 44c2d4ee9..0e76899fa 100644
--- a/weed/mq/broker/broker_grpc_create.go
+++ b/weed/mq/broker/broker_grpc_create.go
@@ -43,7 +43,9 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p
brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker)
if !found {
brokerStats = balancer.NewBrokerStats()
- broker.Balancer.Brokers.Set(bpa.LeaderBroker, brokerStats)
+ if !broker.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+ brokerStats, _ = broker.Balancer.Brokers.Get(bpa.LeaderBroker)
+ }
}
brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
return nil
@@ -52,6 +54,8 @@ func (broker *MessageQueueBroker) CreateTopic(ctx context.Context, request *mq_p
}
}
+ // TODO revert if some error happens in the middle of the assignments
+
return ret, err
}
diff --git a/weed/mq/broker/broker_stats.go b/weed/mq/broker/broker_stats.go
index 2c06f4668..eb9246e4a 100644
--- a/weed/mq/broker/broker_stats.go
+++ b/weed/mq/broker/broker_stats.go
@@ -61,7 +61,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
if err != nil {
return fmt.Errorf("send stats message: %v", err)
}
- glog.V(4).Infof("sent stats: %+v", stats)
+ glog.V(3).Infof("sent stats: %+v", stats)
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 432fa4153..a2bc23f24 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -28,7 +28,9 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
Partitions: make([]*LocalPartition, 0),
}
}
- manager.topics.SetIfAbsent(topic.String(), localTopic)
+ if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
+ localTopic, _ = manager.topics.Get(topic.String())
+ }
if localTopic.findPartition(localPartition.Partition) != nil {
return
}
@@ -61,6 +63,15 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
stats := &mq_pb.BrokerStats{
Stats: make(map[string]*mq_pb.TopicPartitionStats),
}
+
+ // collect current broker's cpu usage
+ // this needs to be in front, so the following stats can be more accurate
+ usages, err := cpu.Percent(duration, false)
+ if err == nil && len(usages) > 0 {
+ stats.CpuUsagePercent = int32(usages[0])
+ }
+
+ // collect current broker's topics and partitions
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{
@@ -85,12 +96,6 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
}
})
- // collect current broker's cpu usage
- usages, err := cpu.Percent(duration, false)
- if err == nil && len(usages) > 0 {
- stats.CpuUsagePercent = int32(usages[0])
- }
-
return stats
}