diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-16 08:57:29 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-16 08:57:29 -0700 |
| commit | b0de01ff3b91cf28edbbaa536eed0e02db4d492c (patch) | |
| tree | a659404528abd4164b051049c43d4de83242ee11 /weed/messaging/msgclient/subscriber.go | |
| parent | 7aafb9e3f800a2cf2ad704b291887b9ea89507d4 (diff) | |
| download | seaweedfs-b0de01ff3b91cf28edbbaa536eed0e02db4d492c.tar.xz seaweedfs-b0de01ff3b91cf28edbbaa536eed0e02db4d492c.zip | |
able to detect disconnected subscribers
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index efbfa0337..2e66923e2 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -5,6 +5,7 @@ import ( "io" "time" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "google.golang.org/grpc" ) @@ -14,7 +15,6 @@ type Subscriber struct { subscriberId string } -/* func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ @@ -23,7 +23,16 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + client, err := setupSubscriberClient(grpcClientConn, tp, subscriberId, startTime) if err != nil { return nil, err } @@ -36,22 +45,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, }, nil } -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { - - stream, err := setupSubscriberClient(subscriberId, namespace, topic, partition, startTime) - if err != nil { - return stream, err - } - if newBroker != nil { - - } - - return stream, nil - -} -*/ - -func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { +func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) if err != nil { return @@ -60,9 +54,9 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, // send init message err = stream.Send(&messaging_pb.SubscriberMessage{ Init: &messaging_pb.SubscriberMessage_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, TimestampNs: startTime.UnixNano(), SubscriberId: subscriberId, @@ -90,6 +84,10 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M println(listenErr.Error()) return listenErr } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } processFn(resp.Data) } } |
