aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/cluster/lock_client.go6
-rw-r--r--weed/mq/broker/broker_grpc_pub.go6
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go4
-rw-r--r--weed/mq/broker/broker_grpc_sub.go19
-rw-r--r--weed/mq/pub_balancer/allocate.go4
-rw-r--r--weed/mq/pub_balancer/allocate_test.go42
-rw-r--r--weed/mq/topic/local_partition.go14
-rw-r--r--weed/util/log_buffer/log_buffer.go30
8 files changed, 62 insertions, 63 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index 5b8daf1a7..6618f5d2f 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -31,9 +31,9 @@ func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *
type LiveLock struct {
key string
renewToken string
- expireAtNs int64
- hostFiler pb.ServerAddress
- cancelCh chan struct{}
+ expireAtNs int64
+ hostFiler pb.ServerAddress
+ cancelCh chan struct{}
grpcDialOption grpc.DialOption
isLocked bool
self string
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index b9dc807e1..3b68db1af 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -63,9 +63,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
for _, follower := range initMessage.FollowerBrokers {
followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
- Topic: initMessage.Topic,
- Partition: initMessage.Partition,
- BrokerSelf: string(b.option.BrokerAddress()),
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
+ BrokerSelf: string(b.option.BrokerAddress()),
})
return err
})
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 533e32f18..8ef85110a 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -12,7 +12,7 @@ import (
"time"
)
-func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){
+func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) {
glog.V(0).Infof("PublishFollowMe %v", request)
var wg sync.WaitGroup
wg.Add(1)
@@ -75,7 +75,7 @@ func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client m
}
if resp.Message != nil {
// process ctrl message or data message
- switch m:= resp.Message.(type) {
+ switch m := resp.Message.(type) {
case *mq_pb.FollowInMemoryMessagesResponse_Data:
// process data message
print("d")
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index e6027d26b..1141ff47f 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -130,7 +130,7 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer
return
}
-func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
+func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
ctx := stream.Context()
clientName := req.GetInit().ConsumerId
@@ -188,8 +188,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
// to indicate the follower is connected
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- },
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
},
})
@@ -200,7 +199,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
var prevFlushTsNs int64
- _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
+ _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
if !isConnected {
return false
}
@@ -285,12 +284,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
// send the log entry
if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
- Data: &mq_pb.DataMessage{
- Key: logEntry.Key,
- Value: logEntry.Data,
- TsNs: logEntry.TsNs,
- },
- }}); err != nil {
+ Data: &mq_pb.DataMessage{
+ Key: logEntry.Key,
+ Value: logEntry.Data,
+ TsNs: logEntry.TsNs,
+ },
+ }}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
return false, err
}
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 39d91bef3..d7632f8d6 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -102,7 +102,7 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
assignment.LeaderBroker = ""
count++
}
- for i:=0; i<followerCount; i++ {
+ for i := 0; i < followerCount; i++ {
if i >= len(assignment.FollowerBrokers) {
count++
continue
@@ -128,7 +128,7 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
hasEmptyFollowers := false
j := 0
- for ; j<len(assignment.FollowerBrokers); j++ {
+ for ; j < len(assignment.FollowerBrokers); j++ {
if assignment.FollowerBrokers[j] == "" {
hasChanges = true
if i < len(pickedBrokers) {
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 5f6342e99..89a6bb23c 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -86,19 +86,19 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
singleActiveBroker := cmap.New[*BrokerStats]()
singleActiveBroker.SetIfAbsent("localhost:1", &BrokerStats{})
tests := []struct {
- name string
- args args
- hasChanges bool
+ name string
+ args args
+ hasChanges bool
}{
{
name: "test empty leader",
args: args{
activeBrokers: activeBrokers,
followerCount: 1,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:2",
},
@@ -112,10 +112,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: activeBrokers,
followerCount: 1,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"",
},
@@ -129,10 +129,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: activeBrokers,
followerCount: 1,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:200",
},
@@ -146,10 +146,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: activeBrokers,
followerCount: 1,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:100",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:200",
},
@@ -163,10 +163,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: activeBrokers,
followerCount: 3,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:2",
},
@@ -180,10 +180,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: activeBrokers,
followerCount: 10,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:2",
},
@@ -197,10 +197,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: lowActiveBrokers,
followerCount: 3,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:2",
},
@@ -214,10 +214,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: lowActiveBrokers,
followerCount: 1,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
},
},
},
@@ -228,10 +228,10 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
args: args{
activeBrokers: singleActiveBroker,
followerCount: 3,
- assignments: []*mq_pb.BrokerPartitionAssignment{
+ assignments: []*mq_pb.BrokerPartitionAssignment{
{
LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ Partition: &mq_pb.Partition{},
FollowerBrokers: []string{
"localhost:2",
},
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 062f3f4bd..798949736 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -12,13 +12,13 @@ import (
type LocalPartition struct {
Partition
- isLeader bool
- FollowerBrokers []pb.ServerAddress
- LogBuffer *log_buffer.LogBuffer
- ConsumerCount int32
- Publishers *LocalPartitionPublishers
- Subscribers *LocalPartitionSubscribers
- FollowerId int32
+ isLeader bool
+ FollowerBrokers []pb.ServerAddress
+ LogBuffer *log_buffer.LogBuffer
+ ConsumerCount int32
+ Publishers *LocalPartitionPublishers
+ Subscribers *LocalPartitionSubscribers
+ FollowerId int32
}
var TIME_FORMAT = "2006-01-02-15-04-05"
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 02d2a6579..64abc67ff 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -27,24 +27,24 @@ type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time,
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
- LastFlushTsNs int64
- name string
- prevBuffers *SealedBuffers
- buf []byte
- batchIndex int64
- idx []int
- pos int
- startTime time.Time
+ LastFlushTsNs int64
+ name string
+ prevBuffers *SealedBuffers
+ buf []byte
+ batchIndex int64
+ idx []int
+ pos int
+ startTime time.Time
stopTime time.Time
lastFlushDataTime time.Time
sizeBuf []byte
- flushInterval time.Duration
- flushFn LogFlushFuncType
- ReadFromDiskFn LogReadFromDiskFuncType
- notifyFn func()
- isStopping *atomic.Bool
- flushChan chan *dataToFlush
- lastTsNs int64
+ flushInterval time.Duration
+ flushFn LogFlushFuncType
+ ReadFromDiskFn LogReadFromDiskFuncType
+ notifyFn func()
+ isStopping *atomic.Bool
+ flushChan chan *dataToFlush
+ lastTsNs int64
sync.RWMutex
}