diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 102 |
1 files changed, 71 insertions, 31 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index a9fdaaf9f..0d3298ae8 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -2,9 +2,10 @@ package broker import ( "context" - "errors" "fmt" "io" + "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -28,7 +29,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs return fmt.Errorf("missing init message") } - ctx := stream.Context() + // Create a cancellable context so we can properly clean up when the client disconnects + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() // Ensure context is cancelled when function exits + clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) t := topic.FromPbTopic(req.GetInit().Topic) @@ -36,23 +40,29 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + glog.V(4).Infof("Calling GetOrGenerateLocalPartition for %s %s", t, partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) if getOrGenErr != nil { + glog.V(4).Infof("GetOrGenerateLocalPartition failed: %v", getOrGenErr) return getOrGenErr } + glog.V(4).Infof("GetOrGenerateLocalPartition succeeded, localTopicPartition=%v", localTopicPartition != nil) + if localTopicPartition == nil { + return fmt.Errorf("failed to get or generate local partition for topic %v partition %v", t, partition) + } subscriber := topic.NewLocalSubscriber() localTopicPartition.Subscribers.AddSubscriber(clientName, subscriber) glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) isConnected := true - sleepIntervalCount := 0 var counter int64 defer func() { isConnected = false localTopicPartition.Subscribers.RemoveSubscriber(clientName) glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) - if localTopicPartition.MaybeShutdownLocalPartition() { + // Use topic-aware shutdown logic to prevent aggressive removal of system topics + if localTopicPartition.MaybeShutdownLocalPartitionForTopic(t.Name) { b.localTopicManager.RemoveLocalPartition(t, partition) } }() @@ -116,12 +126,12 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // skip ack for control messages continue } - imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence) + imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().TsNs) currentLastOffset := imt.GetOldestAckedTimestamp() // Update acknowledged offset and last seen time for this subscriber when it sends an ack subscriber.UpdateAckedOffset(currentLastOffset) - // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset) + // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().TsNs, currentLastOffset) if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ Message: &mq_pb.SubscribeFollowMeRequest_Ack{ @@ -156,35 +166,48 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } }() - return localTopicPartition.Subscribe(clientName, startPosition, func() bool { - if !isConnected { - return false - } - sleepIntervalCount++ - if sleepIntervalCount > 32 { - sleepIntervalCount = 32 - } - time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) + var cancelOnce sync.Once - // Check if the client has disconnected by monitoring the context + err = localTopicPartition.Subscribe(clientName, startPosition, func() bool { + // Check if context is cancelled FIRST before any blocking operations select { case <-ctx.Done(): - err := ctx.Err() - if errors.Is(err, context.Canceled) { - // Client disconnected - return false - } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) return false default: - // Continue processing the request } + if !isConnected { + return false + } + + // Ensure we will wake any Wait() when the client disconnects + cancelOnce.Do(func() { + go func() { + <-ctx.Done() + localTopicPartition.ListenersLock.Lock() + localTopicPartition.ListenersCond.Broadcast() + localTopicPartition.ListenersLock.Unlock() + }() + }) + + // Block until new data is available or the client disconnects + localTopicPartition.ListenersLock.Lock() + atomic.AddInt64(&localTopicPartition.ListenersWaits, 1) + localTopicPartition.ListenersCond.Wait() + atomic.AddInt64(&localTopicPartition.ListenersWaits, -1) + localTopicPartition.ListenersLock.Unlock() + + // Add a small sleep to avoid CPU busy-wait when checking for new data + time.Sleep(10 * time.Millisecond) + + if ctx.Err() != nil { + return false + } + if !isConnected { + return false + } return true }, func(logEntry *filer_pb.LogEntry) (bool, error) { - // reset the sleep interval count - sleepIntervalCount = 0 - for imt.IsInflight(logEntry.Key) { time.Sleep(137 * time.Millisecond) // Check if the client has disconnected by monitoring the context @@ -205,12 +228,15 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs imt.EnflightMessage(logEntry.Key, logEntry.TsNs) } + // Create the message to send + dataMsg := &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + } + if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ - Data: &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - }, + Data: dataMsg, }}); err != nil { glog.Errorf("Error sending data: %v", err) return false, err @@ -222,6 +248,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs counter++ return false, nil }) + + return err } func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) { @@ -247,6 +275,18 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess return } + // use exact offset (native offset-based positioning) + if offsetType == schema_pb.OffsetType_EXACT_OFFSET { + startPosition = log_buffer.NewMessagePositionFromOffset(offset.StartOffset) + return + } + + // reset to specific offset + if offsetType == schema_pb.OffsetType_RESET_TO_OFFSET { + startPosition = log_buffer.NewMessagePositionFromOffset(offset.StartOffset) + return + } + // try to resume if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) |
