aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-08-27 13:13:14 -0700
committerchrislu <chris.lu@gmail.com>2023-08-27 13:13:14 -0700
commit19904566706e9dba502f1ffd1f6fdf0bf876e99c (patch)
treec874f449729d7ceac90e513f2b8e6677823d1e50 /weed/mq/topic/local_partition.go
parent905911853dd5103496e8fc9b47934fa3a48da214 (diff)
downloadseaweedfs-19904566706e9dba502f1ffd1f6fdf0bf876e99c.tar.xz
seaweedfs-19904566706e9dba502f1ffd1f6fdf0bf876e99c.zip
sub
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go9
1 files changed, 9 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 3fc41f65a..6dbaaea39 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -2,6 +2,7 @@ package topic
import (
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"time"
@@ -14,10 +15,18 @@ type LocalPartition struct {
logBuffer *log_buffer.LogBuffer
}
+type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error
+
func (p LocalPartition) Publish(message *mq_pb.DataMessage) {
p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
}
+func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) {
+ p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool {
+ return true
+ }, eachMessageFn)
+}
+
func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
isLeaer := assignment.LeaderBroker == string(self)
localPartition := &LocalPartition{