diff options
Diffstat (limited to 'weed/mq/client/sub_client')
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 13 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 11 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscriber.go | 7 |
3 files changed, 17 insertions, 14 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index b6d6e90b5..470e886d2 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -4,16 +4,17 @@ import ( "context" "errors" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "io" ) -type KeyedOffset struct { - Key []byte - Offset int64 +type KeyedTimestamp struct { + Key []byte + TsNs int64 // Timestamp in nanoseconds for acknowledgment } func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error { @@ -78,8 +79,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ - Key: ack.Key, - Sequence: ack.Offset, + Key: ack.Key, + TsNs: ack.TsNs, }, }, }) diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index d4dea3852..0f3f9b5ee 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,12 +1,13 @@ package sub_client import ( + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "sync" - "time" ) type ProcessorState struct { @@ -75,9 +76,9 @@ func (sub *TopicSubscriber) startProcessors() { if sub.OnDataMessageFunc != nil { sub.OnDataMessageFunc(m) } - sub.PartitionOffsetChan <- KeyedOffset{ - Key: m.Data.Key, - Offset: m.Data.TsNs, + sub.PartitionOffsetChan <- KeyedTimestamp{ + Key: m.Data.Key, + TsNs: m.Data.TsNs, } }) } diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go index ec15d998e..68bf74c5e 100644 --- a/weed/mq/client/sub_client/subscriber.go +++ b/weed/mq/client/sub_client/subscriber.go @@ -2,11 +2,12 @@ package sub_client import ( "context" + "sync" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" - "sync" ) type SubscriberConfiguration struct { @@ -44,10 +45,10 @@ type TopicSubscriber struct { bootstrapBrokers []string activeProcessors map[topic.Partition]*ProcessorState activeProcessorsLock sync.Mutex - PartitionOffsetChan chan KeyedOffset + PartitionOffsetChan chan KeyedTimestamp } -func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber { +func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedTimestamp) *TopicSubscriber { return &TopicSubscriber{ ctx: ctx, SubscriberConfig: subscriber, |
