aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscribe.go')
-rw-r--r--weed/mq/client/sub_client/subscribe.go11
1 files changed, 6 insertions, 5 deletions
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index d4dea3852..0f3f9b5ee 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,12 +1,13 @@
package sub_client
import (
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "sync"
- "time"
)
type ProcessorState struct {
@@ -75,9 +76,9 @@ func (sub *TopicSubscriber) startProcessors() {
if sub.OnDataMessageFunc != nil {
sub.OnDataMessageFunc(m)
}
- sub.PartitionOffsetChan <- KeyedOffset{
- Key: m.Data.Key,
- Offset: m.Data.TsNs,
+ sub.PartitionOffsetChan <- KeyedTimestamp{
+ Key: m.Data.Key,
+ TsNs: m.Data.TsNs,
}
})
}