aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-16 23:16:33 -0700
committerchrislu <chris.lu@gmail.com>2024-03-16 23:16:33 -0700
commit8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5 (patch)
tree23058afcff651e270c3436f24f65d234180fdc4e
parentaba934f0b53a259d4b028fe70eb61c7b89d1ea57 (diff)
downloadseaweedfs-8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5.tar.xz
seaweedfs-8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5.zip
notify
-rw-r--r--weed/mq/broker/broker_grpc_sub.go14
-rw-r--r--weed/mq/topic/local_partition.go19
2 files changed, 22 insertions, 11 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 940af7490..846342596 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -175,7 +175,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes
atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
- sleepIntervalCount := 0
var counter int64
defer func() {
@@ -198,11 +197,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes
var prevFlushTsNs int64
_, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
- sleepIntervalCount++
- if sleepIntervalCount > 32 {
- sleepIntervalCount = 32
- }
- time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
+ // wait for the log buffer to be ready
+ localTopicPartition.ListenersLock.Lock()
+ atomic.AddInt64(&localTopicPartition.ListenersWaits, 1)
+ localTopicPartition.ListenersCond.Wait()
+ atomic.AddInt64(&localTopicPartition.ListenersWaits, -1)
+ localTopicPartition.ListenersLock.Unlock()
if localTopicPartition.LogBuffer.IsStopping() {
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
@@ -246,8 +246,6 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMes
return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
- // reset the sleep interval count
- sleepIntervalCount = 0
// check the follower id
newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 798949736..145b1a450 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -6,11 +6,18 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "sync"
"sync/atomic"
"time"
)
type LocalPartition struct {
+ ListenersWaits int64
+
+ // notifying clients
+ ListenersLock sync.Mutex
+ ListenersCond *sync.Cond
+
Partition
isLeader bool
FollowerBrokers []pb.ServerAddress
@@ -24,15 +31,21 @@ type LocalPartition struct {
var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
- return &LocalPartition{
+ lp := &LocalPartition{
Partition: partition,
isLeader: isLeader,
FollowerBrokers: followerBrokers,
- LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
- 2*time.Minute, logFlushFn, readFromDiskFn, func() {}),
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
+ lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
+ lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
+ 2*time.Minute, logFlushFn, readFromDiskFn, func() {
+ if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
+ lp.ListenersCond.Broadcast()
+ }
+ })
+ return lp
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {