aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go20
-rw-r--r--weed/mq/broker/broker_grpc_sub.go9
2 files changed, 28 insertions, 1 deletions
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index e74d7025f..533e32f18 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -8,11 +8,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"math/rand"
+ "sync"
"time"
)
func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){
glog.V(0).Infof("PublishFollowMe %v", request)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ var ret error
go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
followerId := rand.Int31()
subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
@@ -30,16 +34,30 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.
},
},
})
+
if err != nil {
glog.Errorf("FollowInMemoryMessages error: %v", err)
+ ret = err
return err
}
+ // receive first hello message
+ resp, err := subscribeClient.Recv()
+ if err != nil {
+ return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err)
+ }
+ if resp == nil {
+ glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response")
+ return io.ErrUnexpectedEOF
+ }
+ wg.Done()
+
b.doFollowInMemoryMessage(context.Background(), subscribeClient)
return nil
})
- return &mq_pb.PublishFollowMeResponse{}, nil
+ wg.Wait()
+ return &mq_pb.PublishFollowMeResponse{}, ret
}
func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 3280be2c0..5fd4522bd 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -184,6 +184,15 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
}()
+ // send first hello message
+ // to indicate the follower is connected
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ },
+ },
+ })
+
var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())