aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/agent/agent_grpc_subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/agent/agent_grpc_subscribe.go')
-rw-r--r--weed/mq/agent/agent_grpc_subscribe.go9
1 files changed, 5 insertions, 4 deletions
diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go
index 87baa466c..2deaab9c2 100644
--- a/weed/mq/agent/agent_grpc_subscribe.go
+++ b/weed/mq/agent/agent_grpc_subscribe.go
@@ -2,6 +2,7 @@ package agent
import (
"context"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -67,9 +68,9 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA
return err
}
if m != nil {
- subscriber.PartitionOffsetChan <- sub_client.KeyedOffset{
- Key: m.AckKey,
- Offset: m.AckSequence,
+ subscriber.PartitionOffsetChan <- sub_client.KeyedTimestamp{
+ Key: m.AckKey,
+ TsNs: m.AckSequence, // Note: AckSequence should be renamed to AckTsNs in agent protocol
}
}
}
@@ -98,7 +99,7 @@ func (a *MessageQueueAgent) handleInitSubscribeRecordRequest(ctx context.Context
a.brokersList(),
subscriberConfig,
contentConfig,
- make(chan sub_client.KeyedOffset, 1024),
+ make(chan sub_client.KeyedTimestamp, 1024),
)
return topicSubscriber