aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-16 20:28:19 -0700
committerchrislu <chris.lu@gmail.com>2024-05-16 20:28:19 -0700
commit279cb2b85e923e3012bd8a68d21dad1d28a07588 (patch)
tree16eff564f817e4ab186c72c8c69c3ed318486787
parentfaffb2973c523253247a2ada4fd27408fcfee142 (diff)
downloadseaweedfs-279cb2b85e923e3012bd8a68d21dad1d28a07588.tar.xz
seaweedfs-279cb2b85e923e3012bd8a68d21dad1d28a07588.zip
consumer acks received messages
-rw-r--r--weed/mq/broker/broker_grpc_sub.go30
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go3
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go38
-rw-r--r--weed/mq/client/sub_client/subscribe.go7
4 files changed, 57 insertions, 21 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 02488b2b0..f2214b9df 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -8,20 +8,24 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "io"
"time"
)
-func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) {
+func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
- ctx := stream.Context()
- clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
-
- initMessage := req.GetInit()
- if initMessage == nil {
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+ if req.GetInit() == nil {
glog.Errorf("missing init message")
return fmt.Errorf("missing init message")
}
+ ctx := stream.Context()
+ clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
@@ -52,6 +56,20 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
}
+ go func() {
+ for {
+ ack, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ glog.V(0).Infof("topic %v partition %v subscriber %s error: %v", t, partition, clientName, err)
+ break
+ }
+ println(clientName, "ack =>", ack.GetAck().Sequence)
+ }
+ }()
+
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
if !isConnected {
return false
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index 50470f879..2442709d6 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -49,8 +49,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
- println("flushing at", logBuffer.LastFlushTsNs, "to", targetFile, "size", len(buf))
-
+ glog.V(0).Infof("flushing at %s to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
}
}
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 328968c89..e62ce5265 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -88,7 +88,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
// connect to the partition broker
return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{
+
+ subscribeClient, err := client.SubscribeMessage(context.Background())
+ if err != nil {
+ return fmt.Errorf("create subscribe client: %v", err)
+ }
+
+ if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
@@ -103,25 +109,32 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
FollowerBrokers: assigned.FollowerBrokers,
},
},
- })
-
- if err != nil {
- return fmt.Errorf("create subscribe client: %v", err)
+ });err != nil {
+ glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
}
- glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
}
+
+ partitionOffsetChan:= make(chan int64, 1024)
+
defer func() {
- subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Ack{
- Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Sequence: 0,
+ close(partitionOffsetChan)
+ }()
+
+ go func() {
+ for ack := range partitionOffsetChan {
+ subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Ack{
+ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
+ Sequence: ack,
+ },
},
- },
- })
+ })
+ }
subscribeClient.CloseSend()
}()
@@ -142,6 +155,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
return fmt.Errorf("process error: %v", processErr)
}
sub.alreadyProcessedTsNs = m.Data.TsNs
+ partitionOffsetChan <- m.Data.TsNs
if !shouldContinue {
return nil
}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index b788faeb5..950d2214c 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -73,15 +73,20 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti
for foundOverlapping {
sub.activeProcessorsLock.Lock()
foundOverlapping = false
+ var overlappedPartition topic.Partition
for partition, _ := range sub.activeProcessors {
if partition.Overlaps(topicPartition) {
+ if partition.Equals(topicPartition) {
+ continue
+ }
foundOverlapping = true
+ overlappedPartition = partition
break
}
}
sub.activeProcessorsLock.Unlock()
if foundOverlapping {
- glog.V(0).Infof("subscriber %s/%s waiting for partition %+v to complete", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, topicPartition)
+ glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
time.Sleep(1 * time.Second)
}
}