aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-02-29 09:38:52 -0800
committerchrislu <chris.lu@gmail.com>2024-02-29 09:38:52 -0800
commit1b4484bf0a63f66935c9d0d12fda66d619195542 (patch)
tree410ab9eefe80fe861a7f91ed287222c8db20fa7a
parent2a7028373d653d40428410a78dcb291a168ccac6 (diff)
downloadseaweedfs-1b4484bf0a63f66935c9d0d12fda66d619195542.tar.xz
seaweedfs-1b4484bf0a63f66935c9d0d12fda66d619195542.zip
go fmt
-rw-r--r--weed/filer/reader_cache.go20
-rw-r--r--weed/mq/broker/broker_grpc_sub.go2
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go8
-rw-r--r--weed/mq/broker/broker_write.go26
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go4
-rw-r--r--weed/mq/client/pub_client/publish.go1
-rw-r--r--weed/mq/client/pub_client/publisher.go2
-rw-r--r--weed/mq/pub_balancer/balancer.go6
-rw-r--r--weed/mq/pub_balancer/broker_stats.go12
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go19
-rw-r--r--weed/mq/sub_coordinator/coordinator.go6
-rw-r--r--weed/mq/topic/local_manager.go4
-rw-r--r--weed/mq/topic/local_partition.go1
-rw-r--r--weed/pb/filer_pb/filer.pb.go4
-rw-r--r--weed/pb/filer_pb/filer_grpc.pb.go2
-rw-r--r--weed/pb/iam_pb/iam.pb.go4
-rw-r--r--weed/pb/iam_pb/iam_grpc.pb.go2
-rw-r--r--weed/pb/master_pb/master.pb.go4
-rw-r--r--weed/pb/master_pb/master_grpc.pb.go2
-rw-r--r--weed/pb/mount_pb/mount.pb.go4
-rw-r--r--weed/pb/mount_pb/mount_grpc.pb.go2
-rw-r--r--weed/pb/mq_pb/mq_grpc.pb.go2
-rw-r--r--weed/pb/remote_pb/remote.pb.go4
-rw-r--r--weed/pb/s3_pb/s3.pb.go4
-rw-r--r--weed/pb/s3_pb/s3_grpc.pb.go2
-rw-r--r--weed/pb/volume_server_pb/volume_server.pb.go4
-rw-r--r--weed/pb/volume_server_pb/volume_server_grpc.pb.go2
-rw-r--r--weed/util/buffered_queue/buffered_queue.go14
-rw-r--r--weed/util/log_buffer/log_buffer.go56
-rw-r--r--weed/util/log_buffer/log_read.go2
-rw-r--r--weed/util/log_buffer/sealed_buffer.go8
31 files changed, 116 insertions, 117 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index b74ad25a0..7be54b193 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -23,16 +23,16 @@ type ReaderCache struct {
type SingleChunkCacher struct {
completedTimeNew int64
sync.Mutex
- parent *ReaderCache
- chunkFileId string
- data []byte
- err error
- cipherKey []byte
- isGzipped bool
- chunkSize int
- shouldCache bool
- wg sync.WaitGroup
- cacheStartedCh chan struct{}
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ cacheStartedCh chan struct{}
}
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index ddd6786d0..da61ed05d 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
}()
var startPosition log_buffer.MessagePosition
- if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {
+ if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
offset := req.GetInit().GetPartitionOffset()
if offset.StartTsNs != 0 {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index d2dc4ec3e..0f53c28c3 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -26,7 +26,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf("%s/%s",partitionDir, startTime.Format(topic.TIME_FORMAT))
+ targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
// TODO append block with more metadata
@@ -50,7 +50,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
return b.MasterClient.LookupFileId(fileId)
}
- eachChunkFn := func (buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
+ eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
@@ -99,7 +99,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
if chunk.Size == 0 {
continue
}
- if chunk.IsChunkManifest{
+ if chunk.IsChunkManifest {
glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
return
}
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
if entry.IsDirectory {
return nil
}
- if stopTsNs!=0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
+ if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
isDone = true
return nil
}
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go
index 866cd17c2..896f0ee75 100644
--- a/weed/mq/broker/broker_write.go
+++ b/weed/mq/broker/broker_write.go
@@ -24,14 +24,14 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
var offset int64 = 0
if err == filer_pb.ErrNotFound {
entry = &filer_pb.Entry{
- Name: name,
+ Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
- Crtime: time.Now().Unix(),
- Mtime: time.Now().Unix(),
- FileMode: uint32(os.FileMode(0644)),
- Uid: uint32(os.Getuid()),
- Gid: uint32(os.Getgid()),
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ FileMode: uint32(os.FileMode(0644)),
+ Uid: uint32(os.Getuid()),
+ Gid: uint32(os.Getgid()),
},
}
} else if err != nil {
@@ -45,11 +45,11 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
// update the entry
return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
- Directory: dir,
- Entry: entry,
- })
+ return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: entry,
})
+ })
}
func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
@@ -63,11 +63,11 @@ func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fi
Collection: "topics",
// TtlSec: wfs.option.TtlSec,
// DiskType: string(wfs.option.DiskType),
- DataCenter: b.option.DataCenter,
- Path: targetFile,
+ DataCenter: b.option.DataCenter,
+ Path: targetFile,
},
&operation.UploadOption{
- Cipher: b.option.Cipher,
+ Cipher: b.option.Cipher,
},
func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 6f5b2312d..2873ba21f 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -12,8 +12,8 @@ import (
)
var (
- messageCount = flag.Int("n", 1000, "message count")
- concurrency = flag.Int("c", 4, "concurrent publishers")
+ messageCount = flag.Int("n", 1000, "message count")
+ concurrency = flag.Int("c", 4, "concurrent publishers")
partitionCount = flag.Int("p", 6, "partition count")
namespace = flag.String("ns", "test", "namespace")
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 3b9817e74..1c5891049 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -7,7 +7,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
-
func (p *TopicPublisher) Publish(key, value []byte) error {
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index c952bcfb6..9262d6e0c 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -16,7 +16,7 @@ type PublisherConfiguration struct {
Topic topic.Topic
CreateTopic bool
CreateTopicPartitionCount int32
- Brokers []string
+ Brokers []string
}
type PublishClient struct {
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
index 0007e7364..ad894b1d8 100644
--- a/weed/mq/pub_balancer/balancer.go
+++ b/weed/mq/pub_balancer/balancer.go
@@ -31,10 +31,10 @@ const (
type Balancer struct {
Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
// Collected from all brokers when they connect to the broker leader
- TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
+ TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
OnPartitionChange func(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment)
- OnAddBroker func(broker string, brokerStats *BrokerStats)
- OnRemoveBroker func(broker string, brokerStats *BrokerStats)
+ OnAddBroker func(broker string, brokerStats *BrokerStats)
+ OnRemoveBroker func(broker string, brokerStats *BrokerStats)
}
func NewBalancer() *Balancer {
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index 45c5271df..b4bb28e42 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -39,11 +39,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
- Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
+ Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
Partition: topic.Partition{
RangeStart: topicPartitionStats.Partition.RangeStart,
- RangeStop: topicPartitionStats.Partition.RangeStop,
- RingSize: topicPartitionStats.Partition.RingSize,
+ RangeStop: topicPartitionStats.Partition.RangeStop,
+ RingSize: topicPartitionStats.Partition.RingSize,
UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
},
},
@@ -66,11 +66,11 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
- Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
+ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
Partition: topic.Partition{
RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- RingSize: partition.RingSize,
+ RangeStop: partition.RangeStop,
+ RingSize: partition.RingSize,
UnixTimeNs: partition.UnixTimeNs,
},
},
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index a1279c204..f897fe2b3 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -16,12 +16,12 @@ type ConsumerGroupInstance struct {
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
}
type ConsumerGroup struct {
- topic topic.Topic
+ topic topic.Topic
// map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
- mapping *PartitionConsumerMapping
- reBalanceTimer *time.Timer
- pubBalancer *pub_balancer.Balancer
+ mapping *PartitionConsumerMapping
+ reBalanceTimer *time.Timer
+ pubBalancer *pub_balancer.Balancer
}
func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup {
@@ -40,13 +40,13 @@ func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
}
}
func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
- cg.onConsumerGroupInstanceChange("add consumer instance "+ consumerGroupInstance)
+ cg.onConsumerGroupInstanceChange("add consumer instance " + consumerGroupInstance)
}
func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
- cg.onConsumerGroupInstanceChange("remove consumer instance "+ consumerGroupInstance)
+ cg.onConsumerGroupInstanceChange("remove consumer instance " + consumerGroupInstance)
}
-func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string){
+func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) {
if cg.reBalanceTimer != nil {
cg.reBalanceTimer.Stop()
cg.reBalanceTimer = nil
@@ -107,9 +107,9 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
for i, partitionSlot := range partitionSlots {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
Partition: &mq_pb.Partition{
- RangeStop: partitionSlot.RangeStop,
+ RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart,
- RingSize: partitionSlotToBrokerList.RingSize,
+ RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: partitionSlot.UnixTimeNs,
},
Broker: partitionSlot.Broker,
@@ -126,5 +126,4 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
consumerGroupInstance.ResponseChan <- response
}
-
}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 5a4474076..bb50991ab 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -31,7 +31,7 @@ func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
topicName := toTopicName(topic)
tcg, _ := c.TopicSubscribers.Get(topicName)
- if tcg == nil && createIfMissing{
+ if tcg == nil && createIfMissing {
tcg = &TopicConsumerGroups{
ConsumerGroups: cmap.New[*ConsumerGroup](),
}
@@ -56,14 +56,14 @@ func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string,
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {
cg = NewConsumerGroup(topic, c.balancer)
- if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg){
+ if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) {
cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
}
}
cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
if cgi == nil {
cgi = NewConsumerGroupInstance(consumerGroupInstance)
- if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi){
+ if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) {
cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
}
}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index fc8ea2b1e..aa2eefcdc 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -88,7 +88,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{
- Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
+ Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
Partition: localPartition.Partition,
}
stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{
@@ -96,7 +96,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
Namespace: string(localTopic.Namespace),
Name: localTopic.Name,
},
- Partition: localPartition.Partition.ToPbPartition(),
+ Partition: localPartition.Partition.ToPbPartition(),
ConsumerCount: localPartition.ConsumerCount,
}
// fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition)
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 9b7281b65..8ae029bb4 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -22,6 +22,7 @@ type LocalPartition struct {
}
var TIME_FORMAT = "2006-01-02-15-04-05"
+
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
return &LocalPartition{
Partition: partition,
diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go
index 38101afdb..4e013f9f8 100644
--- a/weed/pb/filer_pb/filer.pb.go
+++ b/weed/pb/filer_pb/filer.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: filer.proto
package filer_pb
diff --git a/weed/pb/filer_pb/filer_grpc.pb.go b/weed/pb/filer_pb/filer_grpc.pb.go
index edc47d7df..ae1564f43 100644
--- a/weed/pb/filer_pb/filer_grpc.pb.go
+++ b/weed/pb/filer_pb/filer_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: filer.proto
package filer_pb
diff --git a/weed/pb/iam_pb/iam.pb.go b/weed/pb/iam_pb/iam.pb.go
index 1468d7b80..d1bd1c4b3 100644
--- a/weed/pb/iam_pb/iam.pb.go
+++ b/weed/pb/iam_pb/iam.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: iam.proto
package iam_pb
diff --git a/weed/pb/iam_pb/iam_grpc.pb.go b/weed/pb/iam_pb/iam_grpc.pb.go
index ba6f9879a..3c2a10a90 100644
--- a/weed/pb/iam_pb/iam_grpc.pb.go
+++ b/weed/pb/iam_pb/iam_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: iam.proto
package iam_pb
diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go
index 30486eaaa..c619776ff 100644
--- a/weed/pb/master_pb/master.pb.go
+++ b/weed/pb/master_pb/master.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: master.proto
package master_pb
diff --git a/weed/pb/master_pb/master_grpc.pb.go b/weed/pb/master_pb/master_grpc.pb.go
index e8462bf96..4c41658cf 100644
--- a/weed/pb/master_pb/master_grpc.pb.go
+++ b/weed/pb/master_pb/master_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: master.proto
package master_pb
diff --git a/weed/pb/mount_pb/mount.pb.go b/weed/pb/mount_pb/mount.pb.go
index 226e4705c..c93aa3a52 100644
--- a/weed/pb/mount_pb/mount.pb.go
+++ b/weed/pb/mount_pb/mount.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: mount.proto
package mount_pb
diff --git a/weed/pb/mount_pb/mount_grpc.pb.go b/weed/pb/mount_pb/mount_grpc.pb.go
index bd6cd9f17..3dd6d126b 100644
--- a/weed/pb/mount_pb/mount_grpc.pb.go
+++ b/weed/pb/mount_pb/mount_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: mount.proto
package mount_pb
diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go
index 39e1c115d..c2ddcdc62 100644
--- a/weed/pb/mq_pb/mq_grpc.pb.go
+++ b/weed/pb/mq_pb/mq_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: mq.proto
package mq_pb
diff --git a/weed/pb/remote_pb/remote.pb.go b/weed/pb/remote_pb/remote.pb.go
index 481b308f2..469a85a19 100644
--- a/weed/pb/remote_pb/remote.pb.go
+++ b/weed/pb/remote_pb/remote.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: remote.proto
package remote_pb
diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go
index 82903904b..e8c8f3226 100644
--- a/weed/pb/s3_pb/s3.pb.go
+++ b/weed/pb/s3_pb/s3.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: s3.proto
package s3_pb
diff --git a/weed/pb/s3_pb/s3_grpc.pb.go b/weed/pb/s3_pb/s3_grpc.pb.go
index 304fe4581..2fedf571b 100644
--- a/weed/pb/s3_pb/s3_grpc.pb.go
+++ b/weed/pb/s3_pb/s3_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: s3.proto
package s3_pb
diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go
index 3811b7fbd..de4891a56 100644
--- a/weed/pb/volume_server_pb/volume_server.pb.go
+++ b/weed/pb/volume_server_pb/volume_server.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.32.0
-// protoc v4.25.2
+// protoc-gen-go v1.31.0
+// protoc v4.25.3
// source: volume_server.proto
package volume_server_pb
diff --git a/weed/pb/volume_server_pb/volume_server_grpc.pb.go b/weed/pb/volume_server_pb/volume_server_grpc.pb.go
index 632b18abb..940adf339 100644
--- a/weed/pb/volume_server_pb/volume_server_grpc.pb.go
+++ b/weed/pb/volume_server_pb/volume_server_grpc.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
-// - protoc v4.25.2
+// - protoc v4.25.3
// source: volume_server.proto
package volume_server_pb
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go
index 6f5f79eb5..edaa0a7ce 100644
--- a/weed/util/buffered_queue/buffered_queue.go
+++ b/weed/util/buffered_queue/buffered_queue.go
@@ -32,12 +32,12 @@ func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
bq := &BufferedQueue[T]{
- chunkSize: chunkSize,
- head: chunk,
- tail: chunk,
- last: chunk,
- count: 0,
- mutex: sync.Mutex{},
+ chunkSize: chunkSize,
+ head: chunk,
+ tail: chunk,
+ last: chunk,
+ count: 0,
+ mutex: sync.Mutex{},
}
bq.waitCond = sync.NewCond(&bq.mutex)
return bq
@@ -87,7 +87,7 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
- for q.count <= 0 && !q.isClosed {
+ for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
if q.count <= 0 && q.isClosed {
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index e7dd3dce0..273df5593 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -27,39 +27,39 @@ type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
- name string
- prevBuffers *SealedBuffers
- buf []byte
- batchIndex int64
- idx []int
- pos int
- startTime time.Time
- stopTime time.Time
- lastFlushTime time.Time
- sizeBuf []byte
- flushInterval time.Duration
+ name string
+ prevBuffers *SealedBuffers
+ buf []byte
+ batchIndex int64
+ idx []int
+ pos int
+ startTime time.Time
+ stopTime time.Time
+ lastFlushTime time.Time
+ sizeBuf []byte
+ flushInterval time.Duration
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
- isStopping *atomic.Bool
- flushChan chan *dataToFlush
- lastTsNs int64
+ isStopping *atomic.Bool
+ flushChan chan *dataToFlush
+ lastTsNs int64
sync.RWMutex
}
func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer {
lb := &LogBuffer{
- name: name,
- prevBuffers: newSealedBuffers(PreviousBufferCount),
- buf: make([]byte, BufferSize),
- sizeBuf: make([]byte, 4),
- flushInterval: flushInterval,
- flushFn: flushFn,
+ name: name,
+ prevBuffers: newSealedBuffers(PreviousBufferCount),
+ buf: make([]byte, BufferSize),
+ sizeBuf: make([]byte, 4),
+ flushInterval: flushInterval,
+ flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
- notifyFn: notifyFn,
- flushChan: make(chan *dataToFlush, 256),
- isStopping: new(atomic.Bool),
+ notifyFn: notifyFn,
+ flushChan: make(chan *dataToFlush, 256),
+ isStopping: new(atomic.Bool),
}
go lb.loopFlush()
go lb.loopInterval()
@@ -199,10 +199,10 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
return nil
}
-func (logBuffer *LogBuffer) GetEarliestTime() time.Time{
+func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
-func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition{
+func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
BatchIndex: logBuffer.batchIndex,
@@ -241,8 +241,8 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
if tsMemory.IsZero() { // case 2.2
println("2.2 no data")
- return nil, -2,nil
- } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex +1 < tsBatchIndex { // case 2.3
+ return nil, -2, nil
+ } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3
if !logBuffer.lastFlushTime.IsZero() {
glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushTime)
return nil, -2, ResumeFromDiskError
@@ -273,7 +273,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
}
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
- return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex,nil
+ return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
}
lastTs := lastReadPosition.UnixNano()
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 6acd5a50f..8a4d2d851 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -18,7 +18,7 @@ var (
)
type MessagePosition struct {
- time.Time // this is the timestamp of the message
+ time.Time // this is the timestamp of the message
BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch.
}
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
index 920a811f2..c41b30fcc 100644
--- a/weed/util/log_buffer/sealed_buffer.go
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -6,10 +6,10 @@ import (
)
type MemBuffer struct {
- buf []byte
- size int
- startTime time.Time
- stopTime time.Time
+ buf []byte
+ size int
+ startTime time.Time
+ stopTime time.Time
batchIndex int64
}