diff options
Diffstat (limited to 'weed/mq/client/sub_client/on_each_partition.go')
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index fbfcc3c6b..14a38cfa8 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -2,14 +2,13 @@ package sub_client import ( "context" + "errors" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "io" - "reflect" - "time" ) type KeyedOffset struct { @@ -35,8 +34,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig if po == nil { po = &schema_pb.PartitionOffset{ Partition: assigned.Partition, - StartTsNs: time.Now().UnixNano(), - StartType: schema_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, + StartTsNs: sub.ContentConfig.OffsetTsNs, } } @@ -47,6 +45,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: sub.ContentConfig.Topic.ToPbTopic(), PartitionOffset: po, + OffsetType: sub.ContentConfig.OffsetType, Filter: sub.ContentConfig.Filter, FollowerBroker: assigned.FollowerBroker, SlidingWindowSize: slidingWindowSize, @@ -65,6 +64,9 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig go func() { for { select { + case <-sub.ctx.Done(): + subscribeClient.CloseSend() + return case <-stopCh: subscribeClient.CloseSend() return @@ -86,15 +88,27 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }() for { - // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { + if errors.Is(err, io.EOF) { + return nil + } return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } + + select { + case <-sub.ctx.Done(): + return nil + case <-stopCh: + return nil + default: + } + switch m := resp.Message.(type) { case *mq_pb.SubscribeMessageResponse_Data: if m.Data.Ctrl != nil { @@ -102,7 +116,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig continue } if len(m.Data.Key) == 0 { - fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m)) + // fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m)) continue } onDataMessageFn(m) |
