diff options
| author | chrislu <chris.lu@gmail.com> | 2023-08-27 13:13:14 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-08-27 13:13:14 -0700 |
| commit | 19904566706e9dba502f1ffd1f6fdf0bf876e99c (patch) | |
| tree | c874f449729d7ceac90e513f2b8e6677823d1e50 /weed/mq/topic/local_partition.go | |
| parent | 905911853dd5103496e8fc9b47934fa3a48da214 (diff) | |
| download | seaweedfs-19904566706e9dba502f1ffd1f6fdf0bf876e99c.tar.xz seaweedfs-19904566706e9dba502f1ffd1f6fdf0bf876e99c.zip | |
sub
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 3fc41f65a..6dbaaea39 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -2,6 +2,7 @@ package topic import ( "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "time" @@ -14,10 +15,18 @@ type LocalPartition struct { logBuffer *log_buffer.LogBuffer } +type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error + func (p LocalPartition) Publish(message *mq_pb.DataMessage) { p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) } +func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) { + p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool { + return true + }, eachMessageFn) +} + func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition { isLeaer := assignment.LeaderBroker == string(self) localPartition := &LocalPartition{ |
