aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client')
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go13
-rw-r--r--weed/mq/client/sub_client/subscribe.go11
-rw-r--r--weed/mq/client/sub_client/subscriber.go7
3 files changed, 17 insertions, 14 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index b6d6e90b5..470e886d2 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "io"
)
-type KeyedOffset struct {
- Key []byte
- Offset int64
+type KeyedTimestamp struct {
+ Key []byte
+ TsNs int64 // Timestamp in nanoseconds for acknowledgment
}
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error {
@@ -78,8 +79,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Key: ack.Key,
- Sequence: ack.Offset,
+ Key: ack.Key,
+ TsNs: ack.TsNs,
},
},
})
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index d4dea3852..0f3f9b5ee 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,12 +1,13 @@
package sub_client
import (
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "sync"
- "time"
)
type ProcessorState struct {
@@ -75,9 +76,9 @@ func (sub *TopicSubscriber) startProcessors() {
if sub.OnDataMessageFunc != nil {
sub.OnDataMessageFunc(m)
}
- sub.PartitionOffsetChan <- KeyedOffset{
- Key: m.Data.Key,
- Offset: m.Data.TsNs,
+ sub.PartitionOffsetChan <- KeyedTimestamp{
+ Key: m.Data.Key,
+ TsNs: m.Data.TsNs,
}
})
}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index ec15d998e..68bf74c5e 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -2,11 +2,12 @@ package sub_client
import (
"context"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
- "sync"
)
type SubscriberConfiguration struct {
@@ -44,10 +45,10 @@ type TopicSubscriber struct {
bootstrapBrokers []string
activeProcessors map[topic.Partition]*ProcessorState
activeProcessorsLock sync.Mutex
- PartitionOffsetChan chan KeyedOffset
+ PartitionOffsetChan chan KeyedTimestamp
}
-func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedOffset) *TopicSubscriber {
+func NewTopicSubscriber(ctx context.Context, bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration, partitionOffsetChan chan KeyedTimestamp) *TopicSubscriber {
return &TopicSubscriber{
ctx: ctx,
SubscriberConfig: subscriber,