aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-02 16:25:43 -0700
committerchrislu <chris.lu@gmail.com>2024-04-02 16:25:43 -0700
commitccdd9cd8decf66089ac201b7c2ca1f5889582b93 (patch)
tree3297dc54cff9893043fea8675b9ea3ef9d595bf3
parentf37c0d0d7a59d433a48a0102d7d76471ab034f40 (diff)
downloadseaweedfs-ccdd9cd8decf66089ac201b7c2ca1f5889582b93.tar.xz
seaweedfs-ccdd9cd8decf66089ac201b7c2ca1f5889582b93.zip
refactor
-rw-r--r--weed/filer/filer_notify.go2
-rw-r--r--weed/filer/meta_aggregator.go2
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go2
-rw-r--r--weed/mq/topic/local_partition.go2
-rw-r--r--weed/util/log_buffer/log_buffer.go7
-rw-r--r--weed/util/log_buffer/log_buffer_test.go7
6 files changed, 16 insertions, 6 deletions
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index db78b3d3d..db953d398 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -83,7 +83,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
return
}
- f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
+ f.LocalMetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs)
}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 663fdfe9f..976822ad1 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -168,7 +168,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
}
dir := event.Directory
// println("received meta change", dir, "size", len(data))
- ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
+ ma.MetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 57cbbd2d2..d8100f021 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -52,7 +52,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
// TODO: change this to DataMessage
// log the message
- logBuffer.AddToBuffer(dataMessage.Key, dataMessage.Value, dataMessage.TsNs)
+ logBuffer.AddToBuffer(dataMessage)
// send back the ack
if err := stream.Send(&mq_pb.PublishFollowMeResponse{
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 157fa2792..54c122a0f 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -52,7 +52,7 @@ func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncTy
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
- p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
+ p.LogBuffer.AddToBuffer(message)
// maybe send to the follower
if p.followerStream != nil {
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 65d20a757..efe42176e 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,6 +2,7 @@ package log_buffer
import (
"bytes"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
"sync/atomic"
"time"
@@ -68,7 +69,11 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
return lb
}
-func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
+func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
+ logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
+}
+
+func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
var toFlush *dataToFlush
logBuffer.Lock()
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index 84279f625..067a02ef4 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -3,6 +3,7 @@ package log_buffer
import (
"crypto/rand"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"sync"
"testing"
@@ -50,7 +51,11 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
- lb.AddToBuffer(nil, buf, 0)
+ lb.AddToBuffer(&mq_pb.DataMessage{
+ Key: nil,
+ Value: buf,
+ TsNs: 0,
+ })
}
wg.Wait()