aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go46
1 files changed, 39 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 31d2a8082..9cdbe8325 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -68,7 +68,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
} else {
defer func() {
println("closing SubscribeFollowMe connection", follower)
- subscribeFollowMeStream.CloseSend()
+ if subscribeFollowMeStream != nil {
+ subscribeFollowMeStream.CloseSend()
+ }
// followerGrpcConnection.Close()
}()
followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
@@ -142,7 +144,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
},
}); err != nil {
- glog.Errorf("Error sending close to follower: %v", err)
+ if err != io.EOF {
+ glog.Errorf("Error sending close to follower: %v", err)
+ }
}
}
}()
@@ -178,6 +182,19 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
for imt.IsInflight(logEntry.Key) {
time.Sleep(137 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return false, nil
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return false, nil
+ default:
+ // Continue processing the request
+ }
}
if logEntry.Key != nil {
imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
@@ -204,20 +221,35 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
return
}
offset := initMessage.GetPartitionOffset()
- if offset.StartTsNs != 0 {
+ offsetType := initMessage.OffsetType
+
+ // reset to earliest or latest
+ if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST {
+ startPosition = log_buffer.NewMessagePosition(1, -3)
+ return
+ }
+ if offsetType == schema_pb.OffsetType_RESET_TO_LATEST {
+ startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
+ return
+ }
+
+ // use the exact timestamp
+ if offsetType == schema_pb.OffsetType_EXACT_TS_NS {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
return
}
+
+ // try to resume
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
- if offset.StartType == schema_pb.PartitionOffsetStartType_EARLIEST {
- startPosition = log_buffer.NewMessagePosition(1, -3)
- } else if offset.StartType == schema_pb.PartitionOffsetStartType_LATEST {
- startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
+ if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST {
+ startPosition = log_buffer.NewMessagePosition(1, -5)
+ } else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST {
+ startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6)
}
return
}