diff options
Diffstat (limited to 'weed/mq/client/pub_client')
| -rw-r--r-- | weed/mq/client/pub_client/connect.go | 11 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/lookup.go | 4 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publish.go | 4 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publisher.go | 2 |
4 files changed, 13 insertions, 8 deletions
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go index fc7ff4d77..7f6d62a67 100644 --- a/weed/mq/client/pub_client/connect.go +++ b/weed/mq/client/pub_client/connect.go @@ -21,17 +21,17 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) } brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.Publish(context.Background()) + stream, err := brokerClient.PublishMessage(context.Background()) if err != nil { return publishClient, fmt.Errorf("create publish client: %v", err) } publishClient = &PublishClient{ - SeaweedMessaging_PublishClient: stream, + SeaweedMessaging_PublishMessageClient: stream, Broker: brokerAddress, } - if err = publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ + if err = publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ Topic: &mq_pb.Topic{ Namespace: p.namespace, Name: p.topic, @@ -40,6 +40,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str RingSize: partition.RingSize, RangeStart: partition.RangeStart, RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, }, AckInterval: 128, }, diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index e55bfd256..ccc83b58d 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -3,6 +3,7 @@ package pub_client import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) @@ -39,6 +40,7 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { }, IsForPublish: true, }) + glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp) if p.config.CreateTopic && err != nil { _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ Topic: &mq_pb.Topic{ @@ -58,12 +60,14 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { }, IsForPublish: true, }) + glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp) } if err != nil { return err } for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { + glog.V(0).Infof("topic %s/%s partition %v leader %s followers %v", p.namespace, p.topic, brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker, brokerPartitionAssignment.FollowerBrokers) // partition => publishClient publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) if err != nil { diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 1e250ede3..2f4367b9d 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -27,8 +27,8 @@ func (p *TopicPublisher) Publish(key, value []byte) error { //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894) //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141) //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19) - if err := publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Data{ + if err := publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ Data: &mq_pb.DataMessage{ Key: key, Value: value, diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index a0c26db36..d5176f21b 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -17,7 +17,7 @@ type PublisherConfiguration struct { } type PublishClient struct { - mq_pb.SeaweedMessaging_PublishClient + mq_pb.SeaweedMessaging_PublishMessageClient Broker string Err error } |
