diff options
| author | chrislu <chris.lu@gmail.com> | 2024-12-19 19:23:27 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-12-19 19:25:06 -0800 |
| commit | ec155022e7e8d0f30bfcfa169248ec3ecc40e960 (patch) | |
| tree | 613416a40b45bd67bb9baf87be9843798a487914 /weed/mq | |
| parent | a1a76ccb8c3316baf269aa856ba268d53e0943ba (diff) | |
| download | seaweedfs-ec155022e7e8d0f30bfcfa169248ec3ecc40e960.tar.xz seaweedfs-ec155022e7e8d0f30bfcfa169248ec3ecc40e960.zip | |
"golang.org/x/exp/slices" => "slices" and go fmt
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/client/cmd/weed_pub_kv/publisher_kv.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publish.go | 2 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/allocate_test.go | 24 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/consumer_group_instance.go | 4 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/inflight_message_tracker.go | 8 | ||||
| -rw-r--r-- | weed/mq/topic/local_topic.go | 2 |
7 files changed, 22 insertions, 22 deletions
diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go index 3ab3cb251..b4d07ae02 100644 --- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go +++ b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go @@ -5,11 +5,11 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "log" "strings" "sync" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go index 8ff667763..d51fdf7af 100644 --- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go +++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go @@ -7,11 +7,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "strings" "time" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index a85eec31f..2a31a2185 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -50,7 +50,7 @@ func (p *TopicPublisher) FinishPublish() error { inputBuffer.Enqueue(&mq_pb.DataMessage{ TsNs: time.Now().UnixNano(), Ctrl: &mq_pb.ControlMessage{ - IsClose: true, + IsClose: true, PublisherName: p.config.PublisherName, }, }) diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go index 4b36af388..63692af0f 100644 --- a/weed/mq/pub_balancer/allocate_test.go +++ b/weed/mq/pub_balancer/allocate_test.go @@ -95,8 +95,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { followerCount: 1, assignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "", - Partition: &mq_pb.Partition{}, + LeaderBroker: "", + Partition: &mq_pb.Partition{}, FollowerBroker: "localhost:2", }, }, @@ -110,8 +110,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { followerCount: 1, assignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, FollowerBroker: "", }, }, @@ -125,8 +125,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { followerCount: 1, assignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, FollowerBroker: "localhost:200", }, }, @@ -140,8 +140,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { followerCount: 1, assignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "localhost:100", - Partition: &mq_pb.Partition{}, + LeaderBroker: "localhost:100", + Partition: &mq_pb.Partition{}, FollowerBroker: "localhost:200", }, }, @@ -155,8 +155,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { followerCount: 3, assignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, FollowerBroker: "localhost:2", }, }, @@ -184,8 +184,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) { followerCount: 3, assignments: []*mq_pb.BrokerPartitionAssignment{ { - LeaderBroker: "localhost:1", - Partition: &mq_pb.Partition{}, + LeaderBroker: "localhost:1", + Partition: &mq_pb.Partition{}, FollowerBroker: "localhost:2", }, }, diff --git a/weed/mq/sub_coordinator/consumer_group_instance.go b/weed/mq/sub_coordinator/consumer_group_instance.go index fce7dde66..74a35b40a 100644 --- a/weed/mq/sub_coordinator/consumer_group_instance.go +++ b/weed/mq/sub_coordinator/consumer_group_instance.go @@ -16,8 +16,8 @@ type ConsumerGroupInstance struct { func NewConsumerGroupInstance(instanceId string, maxPartitionCount int32) *ConsumerGroupInstance { return &ConsumerGroupInstance{ - InstanceId: ConsumerGroupInstanceId(instanceId), - ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), + InstanceId: ConsumerGroupInstanceId(instanceId), + ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1), MaxPartitionCount: maxPartitionCount, } } diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index f1c46e06b..2cdfbc4e5 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -84,9 +84,9 @@ type TimestampStatus struct { // RingBuffer represents a circular buffer to hold timestamps. type RingBuffer struct { - buffer []*TimestampStatus - head int - size int + buffer []*TimestampStatus + head int + size int maxTimestamp int64 maxAllAckedTs int64 } @@ -111,7 +111,7 @@ func (rb *RingBuffer) EnflightTimestamp(timestamp int64) { if rb.size < len(rb.buffer) { rb.size++ } else { - newBuf := newBuffer(2*len(rb.buffer)) + newBuf := newBuffer(2 * len(rb.buffer)) for i := 0; i < rb.size; i++ { newBuf[i] = rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)] } diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go index 25ae03df6..a35bb32b3 100644 --- a/weed/mq/topic/local_topic.go +++ b/weed/mq/topic/local_topic.go @@ -4,7 +4,7 @@ import "sync" type LocalTopic struct { Topic - Partitions []*LocalPartition + Partitions []*LocalPartition partitionLock sync.RWMutex } |
