aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-16 23:16:33 -0700
committerchrislu <chris.lu@gmail.com>2024-03-16 23:16:33 -0700
commit8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5 (patch)
tree23058afcff651e270c3436f24f65d234180fdc4e /weed/mq/topic/local_partition.go
parentaba934f0b53a259d4b028fe70eb61c7b89d1ea57 (diff)
downloadseaweedfs-8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5.tar.xz
seaweedfs-8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5.zip
notify
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go19
1 files changed, 16 insertions, 3 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 798949736..145b1a450 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -6,11 +6,18 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "sync"
"sync/atomic"
"time"
)
type LocalPartition struct {
+ ListenersWaits int64
+
+ // notifying clients
+ ListenersLock sync.Mutex
+ ListenersCond *sync.Cond
+
Partition
isLeader bool
FollowerBrokers []pb.ServerAddress
@@ -24,15 +31,21 @@ type LocalPartition struct {
var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
- return &LocalPartition{
+ lp := &LocalPartition{
Partition: partition,
isLeader: isLeader,
FollowerBrokers: followerBrokers,
- LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
- 2*time.Minute, logFlushFn, readFromDiskFn, func() {}),
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
+ lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
+ lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
+ 2*time.Minute, logFlushFn, readFromDiskFn, func() {
+ if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
+ lp.ListenersCond.Broadcast()
+ }
+ })
+ return lp
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {