aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-15 00:20:21 -0800
committerchrislu <chris.lu@gmail.com>2024-01-15 00:20:21 -0800
commit026c54a9bb39f9483df0d9d74c5fc0f9a67b00da (patch)
treec0f7216c01c22272f5ea2e419587711a88ec68aa
parentfa59a5d67e175dd492f69e90289aa112c4913990 (diff)
downloadseaweedfs-026c54a9bb39f9483df0d9d74c5fc0f9a67b00da.tar.xz
seaweedfs-026c54a9bb39f9483df0d9d74c5fc0f9a67b00da.zip
fix publisher
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go5
-rw-r--r--weed/mq/client/pub_client/publish.go2
2 files changed, 5 insertions, 2 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index d1f2e7c90..01ac48d13 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -25,7 +25,10 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
// Simulate publishing a message
key := []byte(fmt.Sprintf("key-%d-%d", id, i))
value := []byte(fmt.Sprintf("value-%d-%d", id, i))
- publisher.Publish(key, value) // Call your publisher function here
+ if err := publisher.Publish(key, value); err != nil {
+ fmt.Println(err)
+ break
+ }
// println("Published", string(key), string(value))
}
elapsed := time.Since(startTime)
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 2f4367b9d..4b0dfade9 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -12,7 +12,7 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
if hashKey < 0 {
hashKey = -hashKey
}
- publishClient, found := p.partition2Broker.Floor(hashKey, hashKey)
+ publishClient, found := p.partition2Broker.Floor(hashKey+1, hashKey+1)
if !found {
return fmt.Errorf("no broker found for key %d", hashKey)
}