aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-12-22 11:33:50 -0800
committerchrislu <chris.lu@gmail.com>2023-12-22 11:33:50 -0800
commite0727071c84cf8f198f0ef402e85d2eecd29d26b (patch)
treeb8484f45e072e26d7da358e3fc0af01986bc2cd7
parentee1c9bc314970931ebbc018e70bd7ad39bd84602 (diff)
downloadseaweedfs-e0727071c84cf8f198f0ef402e85d2eecd29d26b.tar.xz
seaweedfs-e0727071c84cf8f198f0ef402e85d2eecd29d26b.zip
go fmt
-rw-r--r--weed/command/filer_copy.go30
-rw-r--r--weed/command/filer_remote_sync_dir.go2
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go30
-rw-r--r--weed/mq/pub_balancer/balancer.go3
-rw-r--r--weed/mq/pub_balancer/partition_list_broker.go10
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go11
-rw-r--r--weed/mq/sub_coordinator/coordinator.go7
7 files changed, 47 insertions, 46 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index e1a8999f3..1855e643c 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -31,21 +31,21 @@ var (
)
type CopyOptions struct {
- include *string
- replication *string
- collection *string
- ttl *string
- diskType *string
- maxMB *int
- masterClient *wdclient.MasterClient
- concurrentFiles *int
- concurrentChunks *int
- grpcDialOption grpc.DialOption
- masters []string
- cipher bool
- ttlSec int32
- checkSize *bool
- verbose *bool
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ diskType *string
+ maxMB *int
+ masterClient *wdclient.MasterClient
+ concurrentFiles *int
+ concurrentChunks *int
+ grpcDialOption grpc.DialOption
+ masters []string
+ cipher bool
+ ttlSec int32
+ checkSize *bool
+ verbose *bool
volumeServerAccess *string
}
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 892af17a0..d4305b666 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -80,7 +80,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
-func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
+func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return nil, err
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 349db3178..b8438b61f 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -37,24 +37,24 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// process ack messages
go func() {
- for {
- _, err := stream.Recv()
- if err != nil {
- glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
- }
+ for {
+ _, err := stream.Recv()
+ if err != nil {
+ glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
+ }
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return
+ }
return
+ default:
+ // Continue processing the request
}
- return
- default:
- // Continue processing the request
}
- }
}()
// send commands to subscriber
@@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
}
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
return err
- case message := <- cgi.ResponseChan:
+ case message := <-cgi.ResponseChan:
if err := stream.Send(message); err != nil {
glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
}
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
index 988b971af..5e8c8275e 100644
--- a/weed/mq/pub_balancer/balancer.go
+++ b/weed/mq/pub_balancer/balancer.go
@@ -32,9 +32,10 @@ type Balancer struct {
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
}
+
func NewBalancer() *Balancer {
return &Balancer{
- Brokers: cmap.New[*BrokerStats](),
+ Brokers: cmap.New[*BrokerStats](),
TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
}
}
diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go
index 7ceb2a9fc..9dc6140b3 100644
--- a/weed/mq/pub_balancer/partition_list_broker.go
+++ b/weed/mq/pub_balancer/partition_list_broker.go
@@ -6,8 +6,8 @@ import (
)
type PartitionSlotToBroker struct {
- RangeStart int32
- RangeStop int32
+ RangeStart int32
+ RangeStop int32
AssignedBroker string
}
@@ -36,12 +36,12 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke
}
}
ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
AssignedBroker: broker,
})
}
-func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
+func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
for _, partitionSlot := range ps.PartitionSlots {
if partitionSlot.AssignedBroker == broker {
partitionSlot.AssignedBroker = ""
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index be06a01f8..566a26ef7 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -6,28 +6,29 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
+
type ConsumerGroupInstance struct {
InstanceId string
// the consumer group instance may not have an active partition
- Partitions []*topic.Partition
- ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+ Partitions []*topic.Partition
+ ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
}
type ConsumerGroup struct {
// map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
- mapping *PartitionConsumerMapping
+ mapping *PartitionConsumerMapping
}
func NewConsumerGroup() *ConsumerGroup {
return &ConsumerGroup{
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
- mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
+ mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
}
}
func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
return &ConsumerGroupInstance{
- InstanceId: instanceId,
+ InstanceId: instanceId,
ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
}
}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index f4d65ea5b..7ca536c6b 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -6,7 +6,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
-
type TopicConsumerGroups struct {
// map a consumer group name to a consumer group
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
@@ -19,13 +18,13 @@ type TopicConsumerGroups struct {
type Coordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
- balancer *pub_balancer.Balancer
+ balancer *pub_balancer.Balancer
}
func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
return &Coordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
- balancer: balancer,
+ balancer: balancer,
}
}
@@ -50,7 +49,7 @@ func toTopicName(topic *mq_pb.Topic) string {
return topicName
}
-func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance{
+func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
tcg := c.GetTopicConsumerGroups(topic)
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {