aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client')
-rw-r--r--weed/mq/client/pub_client/publisher.go1
-rw-r--r--weed/mq/client/pub_client/scheduler.go5
2 files changed, 4 insertions, 2 deletions
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 9262d6e0c..09984bae3 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -17,6 +17,7 @@ type PublisherConfiguration struct {
CreateTopic bool
CreateTopicPartitionCount int32
Brokers []string
+ PublisherName string // for debugging
}
type PublishClient struct {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index e91127522..7b4d25869 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -146,6 +146,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
Partition: job.Partition,
AckInterval: 128,
FollowerBrokers: job.FollowerBrokers,
+ PublisherName: p.config.PublisherName,
},
},
}); err != nil {
@@ -184,9 +185,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d", ackResp.AckSequence)
+ log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
- if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
return
}
}