diff options
| -rw-r--r-- | weed/cluster/lock_client.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub_follow.go | 4 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 19 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/allocate.go | 4 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/allocate_test.go | 42 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 14 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 30 |
8 files changed, 62 insertions, 63 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go index 5b8daf1a7..6618f5d2f 100644 --- a/weed/cluster/lock_client.go +++ b/weed/cluster/lock_client.go @@ -31,9 +31,9 @@ func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) * type LiveLock struct { key string renewToken string - expireAtNs int64 - hostFiler pb.ServerAddress - cancelCh chan struct{} + expireAtNs int64 + hostFiler pb.ServerAddress + cancelCh chan struct{} grpcDialOption grpc.DialOption isLocked bool self string diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index b9dc807e1..3b68db1af 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -63,9 +63,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis for _, follower := range initMessage.FollowerBrokers { followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ - Topic: initMessage.Topic, - Partition: initMessage.Partition, - BrokerSelf: string(b.option.BrokerAddress()), + Topic: initMessage.Topic, + Partition: initMessage.Partition, + BrokerSelf: string(b.option.BrokerAddress()), }) return err }) diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 533e32f18..8ef85110a 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -12,7 +12,7 @@ import ( "time" ) -func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){ +func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) { glog.V(0).Infof("PublishFollowMe %v", request) var wg sync.WaitGroup wg.Add(1) @@ -75,7 +75,7 @@ func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client m } if resp.Message != nil { // process ctrl message or data message - switch m:= resp.Message.(type) { + switch m := resp.Message.(type) { case *mq_pb.FollowInMemoryMessagesResponse_Data: // process data message print("d") diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index e6027d26b..1141ff47f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -130,7 +130,7 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer return } -func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { +func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { ctx := stream.Context() clientName := req.GetInit().ConsumerId @@ -188,8 +188,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe // to indicate the follower is connected stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - }, + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{}, }, }) @@ -200,7 +199,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe var prevFlushTsNs int64 - _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { + _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { if !isConnected { return false } @@ -285,12 +284,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe // send the log entry if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ Message: &mq_pb.FollowInMemoryMessagesResponse_Data{ - Data: &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - }, - }}); err != nil { + Data: &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + }, + }}); err != nil { glog.Errorf("Error sending setup response: %v", err) return false, err } diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 39d91bef3..d7632f8d6 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -102,7 +102,7 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * assignment.LeaderBroker = "" count++ } - for i:=0; i<followerCount; i++ { + for i := 0; i < followerCount; i++ { if i >= len(assignment.FollowerBrokers) { count++ continue @@ -128,7 +128,7 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * hasEmptyFollowers := false j := 0 - for ; j<len(assignment.FollowerBrokers); j++ { + for ; j < len(assignment.FollowerBrokers); j++ { if assignment.FollowerBrokers[j] == "" { hasChanges = true if i < len(pickedBrokers) { diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 5f6342e99..89a6bb23c 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -86,19 +86,19 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { singleActiveBroker := cmap.New[*BrokerStats]() singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{}) tests := []struct { - name string - args args - hasChanges bool + name string + args args + hasChanges bool }{ { name: "test empty leader", args: args{ activeBrokers: activeBrokers, followerCount: 1, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:2", }, @@ -112,10 +112,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: activeBrokers, followerCount: 1, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "", }, @@ -129,10 +129,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: activeBrokers, followerCount: 1, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:200", }, @@ -146,10 +146,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: activeBrokers, followerCount: 1, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:100", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:200", }, @@ -163,10 +163,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: activeBrokers, followerCount: 3, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:2", }, @@ -180,10 +180,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: activeBrokers, followerCount: 10, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:2", }, @@ -197,10 +197,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: lowActiveBrokers, followerCount: 3, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:2", }, @@ -214,10 +214,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: lowActiveBrokers, followerCount: 1, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, }, }, }, @@ -228,10 +228,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { args: args{ activeBrokers: singleActiveBroker, followerCount: 3, - assignments: []*mq_pb.BrokerPartitionAssignment{ + assignments: []*mq_pb.BrokerPartitionAssignment{ { LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + Partition: &mq_pb.Partition{}, FollowerBrokers: []string{ "localhost:2", }, diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 062f3f4bd..798949736 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -12,13 +12,13 @@ import ( type LocalPartition struct { Partition - isLeader bool - FollowerBrokers []pb.ServerAddress - LogBuffer *log_buffer.LogBuffer - ConsumerCount int32 - Publishers *LocalPartitionPublishers - Subscribers *LocalPartitionSubscribers - FollowerId int32 + isLeader bool + FollowerBrokers []pb.ServerAddress + LogBuffer *log_buffer.LogBuffer + ConsumerCount int32 + Publishers *LocalPartitionPublishers + Subscribers *LocalPartitionSubscribers + FollowerId int32 } var TIME_FORMAT = "2006-01-02-15-04-05" diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 02d2a6579..64abc67ff 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -27,24 +27,24 @@ type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogBuffer struct { - LastFlushTsNs int64 - name string - prevBuffers *SealedBuffers - buf []byte - batchIndex int64 - idx []int - pos int - startTime time.Time + LastFlushTsNs int64 + name string + prevBuffers *SealedBuffers + buf []byte + batchIndex int64 + idx []int + pos int + startTime time.Time stopTime time.Time lastFlushDataTime time.Time sizeBuf []byte - flushInterval time.Duration - flushFn LogFlushFuncType - ReadFromDiskFn LogReadFromDiskFuncType - notifyFn func() - isStopping *atomic.Bool - flushChan chan *dataToFlush - lastTsNs int64 + flushInterval time.Duration + flushFn LogFlushFuncType + ReadFromDiskFn LogReadFromDiskFuncType + notifyFn func() + isStopping *atomic.Bool + flushChan chan *dataToFlush + lastTsNs int64 sync.RWMutex } |
