diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 46 |
1 files changed, 39 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 31d2a8082..9cdbe8325 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -68,7 +68,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } else { defer func() { println("closing SubscribeFollowMe connection", follower) - subscribeFollowMeStream.CloseSend() + if subscribeFollowMeStream != nil { + subscribeFollowMeStream.CloseSend() + } // followerGrpcConnection.Close() }() followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection) @@ -142,7 +144,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{}, }, }); err != nil { - glog.Errorf("Error sending close to follower: %v", err) + if err != io.EOF { + glog.Errorf("Error sending close to follower: %v", err) + } } } }() @@ -178,6 +182,19 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs for imt.IsInflight(logEntry.Key) { time.Sleep(137 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return false, nil + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return false, nil + default: + // Continue processing the request + } } if logEntry.Key != nil { imt.EnflightMessage(logEntry.Key, logEntry.TsNs) @@ -204,20 +221,35 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess return } offset := initMessage.GetPartitionOffset() - if offset.StartTsNs != 0 { + offsetType := initMessage.OffsetType + + // reset to earliest or latest + if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST { + startPosition = log_buffer.NewMessagePosition(1, -3) + return + } + if offsetType == schema_pb.OffsetType_RESET_TO_LATEST { + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) + return + } + + // use the exact timestamp + if offsetType == schema_pb.OffsetType_EXACT_TS_NS { startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) return } + + // try to resume if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) startPosition = log_buffer.NewMessagePosition(storedOffset, -2) return } - if offset.StartType == schema_pb.PartitionOffsetStartType_EARLIEST { - startPosition = log_buffer.NewMessagePosition(1, -3) - } else if offset.StartType == schema_pb.PartitionOffsetStartType_LATEST { - startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) + if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST { + startPosition = log_buffer.NewMessagePosition(1, -5) + } else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST { + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6) } return } |
