aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/on_each_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/on_each_partition.go')
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go26
1 files changed, 20 insertions, 6 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index fbfcc3c6b..14a38cfa8 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -2,14 +2,13 @@ package sub_client
import (
"context"
+ "errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"io"
- "reflect"
- "time"
)
type KeyedOffset struct {
@@ -35,8 +34,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
if po == nil {
po = &schema_pb.PartitionOffset{
Partition: assigned.Partition,
- StartTsNs: time.Now().UnixNano(),
- StartType: schema_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
+ StartTsNs: sub.ContentConfig.OffsetTsNs,
}
}
@@ -47,6 +45,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
PartitionOffset: po,
+ OffsetType: sub.ContentConfig.OffsetType,
Filter: sub.ContentConfig.Filter,
FollowerBroker: assigned.FollowerBroker,
SlidingWindowSize: slidingWindowSize,
@@ -65,6 +64,9 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
go func() {
for {
select {
+ case <-sub.ctx.Done():
+ subscribeClient.CloseSend()
+ return
case <-stopCh:
subscribeClient.CloseSend()
return
@@ -86,15 +88,27 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}()
for {
- // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
+ if errors.Is(err, io.EOF) {
+ return nil
+ }
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
+
+ select {
+ case <-sub.ctx.Done():
+ return nil
+ case <-stopCh:
+ return nil
+ default:
+ }
+
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
if m.Data.Ctrl != nil {
@@ -102,7 +116,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
continue
}
if len(m.Data.Key) == 0 {
- fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
+ // fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
continue
}
onDataMessageFn(m)