diff options
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 3fc41f65a..6dbaaea39 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -2,6 +2,7 @@ package topic import ( "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "time" @@ -14,10 +15,18 @@ type LocalPartition struct { logBuffer *log_buffer.LogBuffer } +type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error + func (p LocalPartition) Publish(message *mq_pb.DataMessage) { p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) } +func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) { + p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool { + return true + }, eachMessageFn) +} + func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition { isLeaer := assignment.LeaderBroker == string(self) localPartition := &LocalPartition{ |
