aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go102
1 files changed, 71 insertions, 31 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index a9fdaaf9f..0d3298ae8 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -2,9 +2,10 @@ package broker
import (
"context"
- "errors"
"fmt"
"io"
+ "sync"
+ "sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -28,7 +29,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
return fmt.Errorf("missing init message")
}
- ctx := stream.Context()
+ // Create a cancellable context so we can properly clean up when the client disconnects
+ ctx, cancel := context.WithCancel(stream.Context())
+ defer cancel() // Ensure context is cancelled when function exits
+
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
t := topic.FromPbTopic(req.GetInit().Topic)
@@ -36,23 +40,29 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+ glog.V(4).Infof("Calling GetOrGenerateLocalPartition for %s %s", t, partition)
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
if getOrGenErr != nil {
+ glog.V(4).Infof("GetOrGenerateLocalPartition failed: %v", getOrGenErr)
return getOrGenErr
}
+ glog.V(4).Infof("GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil)
+ if localTopicPartition == nil {
+ return fmt.Errorf("failed to get or generate local partition for topic %v partition %v", t, partition)
+ }
subscriber := topic.NewLocalSubscriber()
localTopicPartition.Subscribers.AddSubscriber(clientName, subscriber)
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
- sleepIntervalCount := 0
var counter int64
defer func() {
isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
- if localTopicPartition.MaybeShutdownLocalPartition() {
+ // Use topic-aware shutdown logic to prevent aggressive removal of system topics
+ if localTopicPartition.MaybeShutdownLocalPartitionForTopic(t.Name) {
b.localTopicManager.RemoveLocalPartition(t, partition)
}
}()
@@ -116,12 +126,12 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// skip ack for control messages
continue
}
- imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
+ imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().TsNs)
currentLastOffset := imt.GetOldestAckedTimestamp()
// Update acknowledged offset and last seen time for this subscriber when it sends an ack
subscriber.UpdateAckedOffset(currentLastOffset)
- // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
+ // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().TsNs, currentLastOffset)
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
@@ -156,35 +166,48 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}()
- return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
- if !isConnected {
- return false
- }
- sleepIntervalCount++
- if sleepIntervalCount > 32 {
- sleepIntervalCount = 32
- }
- time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
+ var cancelOnce sync.Once
- // Check if the client has disconnected by monitoring the context
+ err = localTopicPartition.Subscribe(clientName, startPosition, func() bool {
+ // Check if context is cancelled FIRST before any blocking operations
select {
case <-ctx.Done():
- err := ctx.Err()
- if errors.Is(err, context.Canceled) {
- // Client disconnected
- return false
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
- // Continue processing the request
}
+ if !isConnected {
+ return false
+ }
+
+ // Ensure we will wake any Wait() when the client disconnects
+ cancelOnce.Do(func() {
+ go func() {
+ <-ctx.Done()
+ localTopicPartition.ListenersLock.Lock()
+ localTopicPartition.ListenersCond.Broadcast()
+ localTopicPartition.ListenersLock.Unlock()
+ }()
+ })
+
+ // Block until new data is available or the client disconnects
+ localTopicPartition.ListenersLock.Lock()
+ atomic.AddInt64(&localTopicPartition.ListenersWaits, 1)
+ localTopicPartition.ListenersCond.Wait()
+ atomic.AddInt64(&localTopicPartition.ListenersWaits, -1)
+ localTopicPartition.ListenersLock.Unlock()
+
+ // Add a small sleep to avoid CPU busy-wait when checking for new data
+ time.Sleep(10 * time.Millisecond)
+
+ if ctx.Err() != nil {
+ return false
+ }
+ if !isConnected {
+ return false
+ }
return true
}, func(logEntry *filer_pb.LogEntry) (bool, error) {
- // reset the sleep interval count
- sleepIntervalCount = 0
-
for imt.IsInflight(logEntry.Key) {
time.Sleep(137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
@@ -205,12 +228,15 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
}
+ // Create the message to send
+ dataMsg := &mq_pb.DataMessage{
+ Key: logEntry.Key,
+ Value: logEntry.Data,
+ TsNs: logEntry.TsNs,
+ }
+
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
- Data: &mq_pb.DataMessage{
- Key: logEntry.Key,
- Value: logEntry.Data,
- TsNs: logEntry.TsNs,
- },
+ Data: dataMsg,
}}); err != nil {
glog.Errorf("Error sending data: %v", err)
return false, err
@@ -222,6 +248,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
counter++
return false, nil
})
+
+ return err
}
func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
@@ -247,6 +275,18 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
return
}
+ // use exact offset (native offset-based positioning)
+ if offsetType == schema_pb.OffsetType_EXACT_OFFSET {
+ startPosition = log_buffer.NewMessagePositionFromOffset(offset.StartOffset)
+ return
+ }
+
+ // reset to specific offset
+ if offsetType == schema_pb.OffsetType_RESET_TO_OFFSET {
+ startPosition = log_buffer.NewMessagePositionFromOffset(offset.StartOffset)
+ return
+ }
+
// try to resume
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)