diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-16 18:53:54 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-16 18:53:54 -0700 |
| commit | 85b53ac510aca494f8a3d18bb15b829971795b15 (patch) | |
| tree | a88f17ca434862dbf58e5a310e6796795ea07bca /weed/messaging/msgclient/subscriber.go | |
| parent | 759cda0fe297770847b3a2db640bbcaba324f38d (diff) | |
| download | seaweedfs-85b53ac510aca494f8a3d18bb15b829971795b15.tar.xz seaweedfs-85b53ac510aca494f8a3d18bb15b829971795b15.zip | |
detect disconnected subscribers
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 2e66923e2..f96bba2ec 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -15,7 +15,7 @@ type Subscriber struct { subscriberId string } -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, @@ -23,6 +23,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + if partitionId>=0 && i != partitionId { + continue + } tp := broker.TopicPartition{ Namespace: namespace, Topic: topic, @@ -66,17 +69,12 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicParti return } - // process init response - _, err = stream.Recv() - if err != nil { - return - } return stream, nil } -func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { +func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { for { - resp, listenErr := s.subscriberClients[partition].Recv() + resp, listenErr := subscriberClient.Recv() if listenErr == io.EOF { return nil } @@ -95,6 +93,8 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M // Subscribe starts goroutines to process the messages func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { for i := 0; i < len(s.subscriberClients); i++ { - go s.doSubscribe(i, processFn) + if s.subscriberClients[i] != nil { + go doSubscribe(s.subscriberClients[i], processFn) + } } } |
