diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-23 08:25:53 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-23 08:25:53 -0700 |
| commit | cdeaaf95b40336d924aae3383d80e6a708a36d2d (patch) | |
| tree | dba7d4fa09590d12d6a30c8206ca9370d97d0d08 | |
| parent | 77a7c5c2a038df73dd1861c5636af9afe8094c84 (diff) | |
| download | seaweedfs-cdeaaf95b40336d924aae3383d80e6a708a36d2d.tar.xz seaweedfs-cdeaaf95b40336d924aae3383d80e6a708a36d2d.zip | |
go fmt
| -rw-r--r-- | weed/mq/broker/broker_server.go | 6 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go | 8 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_record/subscriber_record.go | 8 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 14 |
5 files changed, 19 insertions, 19 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index cdf652294..44d9deed6 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -44,9 +44,9 @@ type MessageQueueBroker struct { currentFiler pb.ServerAddress localTopicManager *topic.LocalTopicManager PubBalancer *pub_balancer.PubBalancer - lockAsBalancer *cluster.LiveLock - SubCoordinator *sub_coordinator.SubCoordinator - accessLock sync.Mutex + lockAsBalancer *cluster.LiveLock + SubCoordinator *sub_coordinator.SubCoordinator + accessLock sync.Mutex fca *sub_coordinator.FilerClientAccessor } diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index 792794a4a..902e7ed1b 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -14,10 +14,10 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go index 262da6c4d..674c881ba 100644 --- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go +++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go @@ -16,10 +16,10 @@ import ( ) var ( - namespace = flag.String("ns", "test", "namespace") - t = flag.String("topic", "test", "topic") - seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") - maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency") clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 8d5bf2044..6ee447085 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -65,7 +65,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig select { case <-stopCh: break - case ack := <- partitionOffsetChan: + case ack := <-partitionOffsetChan: subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index 129392e44..922593b77 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -27,16 +27,16 @@ type OnEachMessageFunc func(key, value []byte) (err error) type OnCompletionFunc func() type TopicSubscriber struct { - SubscriberConfig *SubscriberConfiguration - ContentConfig *ContentConfiguration + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest OnEachMessageFunc OnEachMessageFunc - OnCompletionFunc OnCompletionFunc - bootstrapBrokers []string - waitForMoreMessage bool - activeProcessors map[topic.Partition]*ProcessorState - activeProcessorsLock sync.Mutex + OnCompletionFunc OnCompletionFunc + bootstrapBrokers []string + waitForMoreMessage bool + activeProcessors map[topic.Partition]*ProcessorState + activeProcessorsLock sync.Mutex } func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { |
