aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client/subscribe.go')
-rw-r--r--weed/mq/client/sub_client/subscribe.go12
1 files changed, 6 insertions, 6 deletions
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index cf2294891..d4dea3852 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -72,12 +72,12 @@ func (sub *TopicSubscriber) startProcessors() {
executors := util.NewLimitedConcurrentExecutor(int(sub.SubscriberConfig.SlidingWindowSize))
onDataMessageFn := func(m *mq_pb.SubscribeMessageResponse_Data) {
executors.Execute(func() {
- processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
- if processErr == nil {
- sub.PartitionOffsetChan <- KeyedOffset{
- Key: m.Data.Key,
- Offset: m.Data.TsNs,
- }
+ if sub.OnDataMessageFunc != nil {
+ sub.OnDataMessageFunc(m)
+ }
+ sub.PartitionOffsetChan <- KeyedOffset{
+ Key: m.Data.Key,
+ Offset: m.Data.TsNs,
}
})
}