aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-12-19 19:23:27 -0800
committerchrislu <chris.lu@gmail.com>2024-12-19 19:25:06 -0800
commitec155022e7e8d0f30bfcfa169248ec3ecc40e960 (patch)
tree613416a40b45bd67bb9baf87be9843798a487914 /weed/mq
parenta1a76ccb8c3316baf269aa856ba268d53e0943ba (diff)
downloadseaweedfs-ec155022e7e8d0f30bfcfa169248ec3ecc40e960.tar.xz
seaweedfs-ec155022e7e8d0f30bfcfa169248ec3ecc40e960.zip
"golang.org/x/exp/slices" => "slices" and go fmt
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/client/cmd/weed_pub_kv/publisher_kv.go2
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go2
-rw-r--r--weed/mq/client/pub_client/publish.go2
-rw-r--r--weed/mq/pub_balancer/allocate_test.go24
-rw-r--r--weed/mq/sub_coordinator/consumer_group_instance.go4
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go8
-rw-r--r--weed/mq/topic/local_topic.go2
7 files changed, 22 insertions, 22 deletions
diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
index 3ab3cb251..b4d07ae02 100644
--- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
+++ b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
@@ -5,11 +5,11 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"log"
"strings"
"sync"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index 8ff667763..d51fdf7af 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -7,11 +7,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strings"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index a85eec31f..2a31a2185 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -50,7 +50,7 @@ func (p *TopicPublisher) FinishPublish() error {
inputBuffer.Enqueue(&mq_pb.DataMessage{
TsNs: time.Now().UnixNano(),
Ctrl: &mq_pb.ControlMessage{
- IsClose: true,
+ IsClose: true,
PublisherName: p.config.PublisherName,
},
})
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 4b36af388..63692af0f 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -95,8 +95,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
followerCount: 1,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "",
- Partition: &mq_pb.Partition{},
+ LeaderBroker: "",
+ Partition: &mq_pb.Partition{},
FollowerBroker: "localhost:2",
},
},
@@ -110,8 +110,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
followerCount: 1,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
FollowerBroker: "",
},
},
@@ -125,8 +125,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
followerCount: 1,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
FollowerBroker: "localhost:200",
},
},
@@ -140,8 +140,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
followerCount: 1,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "localhost:100",
- Partition: &mq_pb.Partition{},
+ LeaderBroker: "localhost:100",
+ Partition: &mq_pb.Partition{},
FollowerBroker: "localhost:200",
},
},
@@ -155,8 +155,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
followerCount: 3,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
FollowerBroker: "localhost:2",
},
},
@@ -184,8 +184,8 @@ func TestEnsureAssignmentsToActiveBrokersX(t *testing.T) {
followerCount: 3,
assignments: []*mq_pb.BrokerPartitionAssignment{
{
- LeaderBroker: "localhost:1",
- Partition: &mq_pb.Partition{},
+ LeaderBroker: "localhost:1",
+ Partition: &mq_pb.Partition{},
FollowerBroker: "localhost:2",
},
},
diff --git a/weed/mq/sub_coordinator/consumer_group_instance.go b/weed/mq/sub_coordinator/consumer_group_instance.go
index fce7dde66..74a35b40a 100644
--- a/weed/mq/sub_coordinator/consumer_group_instance.go
+++ b/weed/mq/sub_coordinator/consumer_group_instance.go
@@ -16,8 +16,8 @@ type ConsumerGroupInstance struct {
func NewConsumerGroupInstance(instanceId string, maxPartitionCount int32) *ConsumerGroupInstance {
return &ConsumerGroupInstance{
- InstanceId: ConsumerGroupInstanceId(instanceId),
- ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
+ InstanceId: ConsumerGroupInstanceId(instanceId),
+ ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
MaxPartitionCount: maxPartitionCount,
}
}
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index f1c46e06b..2cdfbc4e5 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -84,9 +84,9 @@ type TimestampStatus struct {
// RingBuffer represents a circular buffer to hold timestamps.
type RingBuffer struct {
- buffer []*TimestampStatus
- head int
- size int
+ buffer []*TimestampStatus
+ head int
+ size int
maxTimestamp int64
maxAllAckedTs int64
}
@@ -111,7 +111,7 @@ func (rb *RingBuffer) EnflightTimestamp(timestamp int64) {
if rb.size < len(rb.buffer) {
rb.size++
} else {
- newBuf := newBuffer(2*len(rb.buffer))
+ newBuf := newBuffer(2 * len(rb.buffer))
for i := 0; i < rb.size; i++ {
newBuf[i] = rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)]
}
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
index 25ae03df6..a35bb32b3 100644
--- a/weed/mq/topic/local_topic.go
+++ b/weed/mq/topic/local_topic.go
@@ -4,7 +4,7 @@ import "sync"
type LocalTopic struct {
Topic
- Partitions []*LocalPartition
+ Partitions []*LocalPartition
partitionLock sync.RWMutex
}