aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 11:05:18 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 11:05:18 -0700
commit362219688127097beef288a69bf8df45566e1ec7 (patch)
tree0999be4d7067550443de921e92ac8fd3d7af0f34
parent5038577f7e6d6137f70ce3f0499142d480768312 (diff)
downloadseaweedfs-362219688127097beef288a69bf8df45566e1ec7.tar.xz
seaweedfs-362219688127097beef288a69bf8df45566e1ec7.zip
go fmt
-rw-r--r--weed/mq/broker/broker_grpc_sub.go6
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go3
-rw-r--r--weed/mq/broker/broker_server.go5
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go2
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go4
-rw-r--r--weed/mq/client/pub_client/scheduler.go10
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go20
-rw-r--r--weed/mq/client/sub_client/subscribe.go1
-rw-r--r--weed/mq/client/sub_client/subscriber.go16
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go8
-rw-r--r--weed/mq/sub_coordinator/coordinator.go4
-rw-r--r--weed/mq/sub_coordinator/filer_client_accessor.go4
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go15
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping.go8
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping_test.go8
15 files changed, 58 insertions, 56 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index ee434d37b..db2964121 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -75,8 +75,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Init{
Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
- Topic: req.GetInit().Topic,
- Partition: req.GetInit().GetPartitionOffset().Partition,
+ Topic: req.GetInit().Topic,
+ Partition: req.GetInit().GetPartitionOffset().Partition,
ConsumerGroup: req.GetInit().ConsumerGroup,
},
},
@@ -186,7 +186,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
return
}
- if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil{
+ if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index 5a77ceebb..eb041dc60 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -12,7 +12,6 @@ import (
"time"
)
-
func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
var req *mq_pb.SubscribeFollowMeRequest
req, err = stream.Recv()
@@ -72,7 +71,7 @@ func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.Subscrib
err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
if err != nil {
- return err
+ return err
}
if len(data) != 8 {
return fmt.Errorf("no offset found")
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index b3d86a401..640ccca10 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -47,7 +47,7 @@ type MessageQueueBroker struct {
lockAsBalancer *cluster.LiveLock
Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex
- fca *sub_coordinator.FilerClientAccessor
+ fca *sub_coordinator.FilerClientAccessor
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -65,7 +65,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
Coordinator: coordinator,
}
fca := &sub_coordinator.FilerClientAccessor{
- GetFiler: mqBroker.GetFiler,
+ GetFiler: mqBroker.GetFiler,
GetGrpcDialOption: mqBroker.GetGrpcDialOption,
}
mqBroker.fca = fca
@@ -130,7 +130,6 @@ func (b *MessageQueueBroker) GetGrpcDialOption() grpc.DialOption {
return b.grpcDialOption
}
-
func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
return b.currentFiler
}
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index 4bbb26032..b18865877 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -44,7 +44,7 @@ func main() {
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
counter := 0
- subscriber.SetEachMessageFunc(func(key, value []byte) (error) {
+ subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++
println(string(key), "=>", string(value), counter)
return nil
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index ed710fa57..7aa381c76 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -63,7 +63,7 @@ func main() {
}
processorConfig := sub_client.ProcessorConfiguration{
- MaxPartitionCount: 3,
+ MaxPartitionCount: 3,
PerPartitionConcurrency: 1,
}
@@ -71,7 +71,7 @@ func main() {
subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
counter := 0
- subscriber.SetEachMessageFunc(func(key, value []byte) (error) {
+ subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++
record := &schema_pb.RecordValue{}
proto.Unmarshal(value, record)
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index ef7cfb93f..df2270b2c 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -142,11 +142,11 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if err = publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{
- Topic: p.config.Topic.ToPbTopic(),
- Partition: job.Partition,
- AckInterval: 128,
- FollowerBroker: job.FollowerBroker,
- PublisherName: p.config.PublisherName,
+ Topic: p.config.Topic.ToPbTopic(),
+ Partition: job.Partition,
+ AckInterval: 128,
+ FollowerBroker: job.FollowerBroker,
+ PublisherName: p.config.PublisherName,
},
},
}); err != nil {
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 815694a48..118d91bac 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -51,7 +51,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerGroupInstanceId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
- MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount,
+ MaxPartitionCount: sub.ProcessorConfig.MaxPartitionCount,
},
},
}); err != nil {
@@ -105,12 +105,12 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
Partition: assigned.Partition,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
- Filter: sub.ContentConfig.Filter,
+ Filter: sub.ContentConfig.Filter,
FollowerBroker: assigned.FollowerBroker,
- Concurrency: sub.ProcessorConfig.PerPartitionConcurrency,
+ Concurrency: sub.ProcessorConfig.PerPartitionConcurrency,
},
},
- });err != nil {
+ }); err != nil {
glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
}
@@ -120,16 +120,16 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
defer sub.OnCompletionFunc()
}
- partitionOffsetChan:= make(chan int64, 1024)
+ partitionOffsetChan := make(chan int64, 1024)
defer func() {
close(partitionOffsetChan)
}()
- concurrentPartitionLimit := int(sub.ProcessorConfig.MaxPartitionCount)
- if concurrentPartitionLimit <= 0 {
- concurrentPartitionLimit = 1
+ perPartitionConcurrency := int(sub.ProcessorConfig.PerPartitionConcurrency)
+ if perPartitionConcurrency <= 0 {
+ perPartitionConcurrency = 1
}
- executors := util.NewLimitedConcurrentExecutor(concurrentPartitionLimit)
+ executors := util.NewLimitedConcurrentExecutor(perPartitionConcurrency)
go func() {
for ack := range partitionOffsetChan {
@@ -162,7 +162,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil {
partitionOffsetChan <- m.Data.TsNs
- }else{
+ } else {
lastErr = processErr
}
})
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index ba20cf040..c519ca18b 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -9,7 +9,6 @@ import (
)
type ProcessorState struct {
-
}
// Subscribe subscribes to a topic's specified partitions.
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 95320b19a..cf41f2881 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -22,7 +22,7 @@ type ContentConfiguration struct {
}
type ProcessorConfiguration struct {
- MaxPartitionCount int32 // how many partitions to process concurrently
+ MaxPartitionCount int32 // how many partitions to process concurrently
PerPartitionConcurrency int32 // how many messages to process concurrently per partition
}
@@ -30,16 +30,16 @@ type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
- SubscriberConfig *SubscriberConfiguration
- ContentConfig *ContentConfiguration
+ SubscriberConfig *SubscriberConfiguration
+ ContentConfig *ContentConfiguration
ProcessorConfig *ProcessorConfiguration
brokerPartitionAssignmentChan chan *mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
- OnCompletionFunc OnCompletionFunc
- bootstrapBrokers []string
- waitForMoreMessage bool
- activeProcessors map[topic.Partition]*ProcessorState
- activeProcessorsLock sync.Mutex
+ OnCompletionFunc OnCompletionFunc
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ activeProcessors map[topic.Partition]*ProcessorState
+ activeProcessorsLock sync.Mutex
}
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, processor ProcessorConfiguration) *TopicSubscriber {
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 8e804c7fb..f6a13217e 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -12,8 +12,8 @@ import (
type ConsumerGroupInstance struct {
InstanceId string
// the consumer group instance may not have an active partition
- Partitions []*topic.Partition
- ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+ Partitions []*topic.Partition
+ ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
MaxPartitionCount int32
}
type ConsumerGroup struct {
@@ -43,10 +43,10 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
- cg.onConsumerGroupInstanceChange(true, "add consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
+ cg.onConsumerGroupInstanceChange(true, "add consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic, maxPartitionCount, rebalanceSeconds int32) {
- cg.onConsumerGroupInstanceChange(false, "remove consumer instance " + consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
+ cg.onConsumerGroupInstanceChange(false, "remove consumer instance "+consumerGroupInstance, maxPartitionCount, rebalanceSeconds)
}
func (cg *ConsumerGroup) onConsumerGroupInstanceChange(isAdd bool, reason string, maxPartitionCount, rebalanceSeconds int32) {
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 4eafbca57..4bb726f26 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -17,8 +17,8 @@ type TopicConsumerGroups struct {
type Coordinator struct {
// map topic name to consumer groups
- TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
- balancer *pub_balancer.Balancer
+ TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
+ balancer *pub_balancer.Balancer
FilerClientAccessor *FilerClientAccessor
}
diff --git a/weed/mq/sub_coordinator/filer_client_accessor.go b/weed/mq/sub_coordinator/filer_client_accessor.go
index 85bb5e29d..dc50ac128 100644
--- a/weed/mq/sub_coordinator/filer_client_accessor.go
+++ b/weed/mq/sub_coordinator/filer_client_accessor.go
@@ -14,8 +14,8 @@ import (
)
type FilerClientAccessor struct {
- GetFiler func() pb.ServerAddress
- GetGrpcDialOption func()grpc.DialOption
+ GetFiler func() pb.ServerAddress
+ GetGrpcDialOption func() grpc.DialOption
}
func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index 5e13ac427..f48335435 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -6,15 +6,15 @@ import (
)
type InflightMessageTracker struct {
- messages map[string]int64
- mu sync.Mutex
- timestamps *RingBuffer
+ messages map[string]int64
+ mu sync.Mutex
+ timestamps *RingBuffer
}
func NewInflightMessageTracker(capacity int) *InflightMessageTracker {
return &InflightMessageTracker{
- messages: make(map[string]int64),
- timestamps: NewRingBuffer(capacity),
+ messages: make(map[string]int64),
+ timestamps: NewRingBuffer(capacity),
}
}
@@ -26,6 +26,7 @@ func (imt *InflightMessageTracker) InflightMessage(key []byte, tsNs int64) {
imt.messages[string(key)] = tsNs
imt.timestamps.Add(tsNs)
}
+
// IsMessageAcknowledged returns true if the message has been acknowledged.
// If the message is older than the oldest inflight messages, returns false.
// returns false if the message is inflight.
@@ -47,6 +48,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64)
return true
}
+
// AcknowledgeMessage acknowledges the message with the key and timestamp.
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
imt.mu.Lock()
@@ -71,12 +73,14 @@ type RingBuffer struct {
head int
size int
}
+
// NewRingBuffer creates a new RingBuffer of the given capacity.
func NewRingBuffer(capacity int) *RingBuffer {
return &RingBuffer{
buffer: make([]int64, capacity),
}
}
+
// Add adds a new timestamp to the ring buffer.
func (rb *RingBuffer) Add(timestamp int64) {
rb.buffer[rb.head] = timestamp
@@ -85,6 +89,7 @@ func (rb *RingBuffer) Add(timestamp int64) {
rb.size++
}
}
+
// Remove removes the specified timestamp from the ring buffer.
func (rb *RingBuffer) Remove(timestamp int64) {
// Perform binary search
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go
index 256d5e78c..23e0fb00f 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go
@@ -82,10 +82,10 @@ func doBalanceSticky(partitions []*pub_balancer.PartitionSlotToBroker, consumerI
newPartitionSlots := make([]*PartitionSlotToConsumerInstance, 0, len(partitions))
for _, partition := range partitions {
newPartitionSlots = append(newPartitionSlots, &PartitionSlotToConsumerInstance{
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- UnixTimeNs: partition.UnixTimeNs,
- Broker: partition.AssignedBroker,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ UnixTimeNs: partition.UnixTimeNs,
+ Broker: partition.AssignedBroker,
FollowerBroker: partition.FollowerBroker,
})
}
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
index 7dcfc6f9b..415eb27bd 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
@@ -32,7 +32,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1,
},
},
- prevMapping: nil,
+ prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{
@@ -61,7 +61,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1,
},
},
- prevMapping: nil,
+ prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{
@@ -90,7 +90,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1,
},
},
- prevMapping: nil,
+ prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{
@@ -128,7 +128,7 @@ func Test_doBalanceSticky(t *testing.T) {
MaxPartitionCount: 1,
},
},
- prevMapping: nil,
+ prevMapping: nil,
},
wantPartitionSlots: []*PartitionSlotToConsumerInstance{
{