diff options
Diffstat (limited to 'weed/mq/agent/agent_grpc_subscribe.go')
| -rw-r--r-- | weed/mq/agent/agent_grpc_subscribe.go | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go index 87baa466c..2deaab9c2 100644 --- a/weed/mq/agent/agent_grpc_subscribe.go +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -2,6 +2,7 @@ package agent import ( "context" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -67,9 +68,9 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA return err } if m != nil { - subscriber.PartitionOffsetChan <- sub_client.KeyedOffset{ - Key: m.AckKey, - Offset: m.AckSequence, + subscriber.PartitionOffsetChan <- sub_client.KeyedTimestamp{ + Key: m.AckKey, + TsNs: m.AckSequence, // Note: AckSequence should be renamed to AckTsNs in agent protocol } } } @@ -98,7 +99,7 @@ func (a *MessageQueueAgent) handleInitSubscribeRecordRequest(ctx context.Context a.brokersList(), subscriberConfig, contentConfig, - make(chan sub_client.KeyedOffset, 1024), + make(chan sub_client.KeyedTimestamp, 1024), ) return topicSubscriber |
