aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/publish.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/publish.go')
-rw-r--r--weed/mq/client/pub_client/publish.go32
1 files changed, 8 insertions, 24 deletions
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 1e250ede3..3b9817e74 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -7,35 +7,19 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
+
func (p *TopicPublisher) Publish(key, value []byte) error {
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
}
- publishClient, found := p.partition2Broker.Floor(hashKey, hashKey)
+ inputBuffer, found := p.partition2Buffer.Floor(hashKey+1, hashKey+1)
if !found {
- return fmt.Errorf("no broker found for key %d", hashKey)
- }
- p.Lock()
- defer p.Unlock()
- // dead lock here
- //google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59)
- //google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047)
- //google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040)
- //google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892)
- //google.golang.org/grpc.(*clientStream).withRetry(stream.go:752)
- //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894)
- //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141)
- //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19)
- if err := publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Data{
- Data: &mq_pb.DataMessage{
- Key: key,
- Value: value,
- },
- },
- }); err != nil {
- return fmt.Errorf("send publish request: %v", err)
+ return fmt.Errorf("no input buffer found for key %d", hashKey)
}
- return nil
+
+ return inputBuffer.Enqueue(&mq_pb.DataMessage{
+ Key: key,
+ Value: value,
+ })
}