aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_server.go6
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go8
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go8
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go2
-rw-r--r--weed/mq/client/sub_client/subscriber.go14
5 files changed, 19 insertions, 19 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index cdf652294..44d9deed6 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -44,9 +44,9 @@ type MessageQueueBroker struct {
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
PubBalancer *pub_balancer.PubBalancer
- lockAsBalancer *cluster.LiveLock
- SubCoordinator *sub_coordinator.SubCoordinator
- accessLock sync.Mutex
+ lockAsBalancer *cluster.LiveLock
+ SubCoordinator *sub_coordinator.SubCoordinator
+ accessLock sync.Mutex
fca *sub_coordinator.FilerClientAccessor
}
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index 792794a4a..902e7ed1b 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -14,10 +14,10 @@ import (
)
var (
- namespace = flag.String("ns", "test", "namespace")
- t = flag.String("topic", "test", "topic")
- seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
- maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+ maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index 262da6c4d..674c881ba 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -16,10 +16,10 @@ import (
)
var (
- namespace = flag.String("ns", "test", "namespace")
- t = flag.String("topic", "test", "topic")
- seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
- maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+ maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 8d5bf2044..6ee447085 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -65,7 +65,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
select {
case <-stopCh:
break
- case ack := <- partitionOffsetChan:
+ case ack := <-partitionOffsetChan:
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 129392e44..922593b77 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -27,16 +27,16 @@ type OnEachMessageFunc func(key, value []byte) (err error)
type OnCompletionFunc func()
type TopicSubscriber struct {
- SubscriberConfig *SubscriberConfiguration
- ContentConfig *ContentConfiguration
+ SubscriberConfig *SubscriberConfiguration
+ ContentConfig *ContentConfiguration
brokerPartitionAssignmentChan chan *mq_pb.SubscriberToSubCoordinatorResponse
brokerPartitionAssignmentAckChan chan *mq_pb.SubscriberToSubCoordinatorRequest
OnEachMessageFunc OnEachMessageFunc
- OnCompletionFunc OnCompletionFunc
- bootstrapBrokers []string
- waitForMoreMessage bool
- activeProcessors map[topic.Partition]*ProcessorState
- activeProcessorsLock sync.Mutex
+ OnCompletionFunc OnCompletionFunc
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ activeProcessors map[topic.Partition]*ProcessorState
+ activeProcessorsLock sync.Mutex
}
func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {