diff options
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 2e66923e2..f96bba2ec 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -15,7 +15,7 @@ type Subscriber struct { subscriberId string } -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { // read topic configuration topicConfiguration := &messaging_pb.TopicConfiguration{ PartitionCount: 4, @@ -23,6 +23,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + if partitionId>=0 && i != partitionId { + continue + } tp := broker.TopicPartition{ Namespace: namespace, Topic: topic, @@ -66,17 +69,12 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicParti return } - // process init response - _, err = stream.Recv() - if err != nil { - return - } return stream, nil } -func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { +func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { for { - resp, listenErr := s.subscriberClients[partition].Recv() + resp, listenErr := subscriberClient.Recv() if listenErr == io.EOF { return nil } @@ -95,6 +93,8 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M // Subscribe starts goroutines to process the messages func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { for i := 0; i < len(s.subscriberClients); i++ { - go s.doSubscribe(i, processFn) + if s.subscriberClients[i] != nil { + go doSubscribe(s.subscriberClients[i], processFn) + } } } |
