diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-16 23:16:33 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-16 23:16:33 -0700 |
| commit | 8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5 (patch) | |
| tree | 23058afcff651e270c3436f24f65d234180fdc4e /weed/mq/topic/local_partition.go | |
| parent | aba934f0b53a259d4b028fe70eb61c7b89d1ea57 (diff) | |
| download | seaweedfs-8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5.tar.xz seaweedfs-8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5.zip | |
notify
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 798949736..145b1a450 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -6,11 +6,18 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "sync" "sync/atomic" "time" ) type LocalPartition struct { + ListenersWaits int64 + + // notifying clients + ListenersLock sync.Mutex + ListenersCond *sync.Cond + Partition isLeader bool FollowerBrokers []pb.ServerAddress @@ -24,15 +31,21 @@ type LocalPartition struct { var TIME_FORMAT = "2006-01-02-15-04-05" func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { - return &LocalPartition{ + lp := &LocalPartition{ Partition: partition, isLeader: isLeader, FollowerBrokers: followerBrokers, - LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), - 2*time.Minute, logFlushFn, readFromDiskFn, func() {}), Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), } + lp.ListenersCond = sync.NewCond(&lp.ListenersLock) + lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), + 2*time.Minute, logFlushFn, readFromDiskFn, func() { + if atomic.LoadInt64(&lp.ListenersWaits) > 0 { + lp.ListenersCond.Broadcast() + } + }) + return lp } func (p *LocalPartition) Publish(message *mq_pb.DataMessage) { |
