aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/subscriber.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
-rw-r--r--weed/messaging/msgclient/subscriber.go18
1 files changed, 9 insertions, 9 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
index 2e66923e2..f96bba2ec 100644
--- a/weed/messaging/msgclient/subscriber.go
+++ b/weed/messaging/msgclient/subscriber.go
@@ -15,7 +15,7 @@ type Subscriber struct {
subscriberId string
}
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
// read topic configuration
topicConfiguration := &messaging_pb.TopicConfiguration{
PartitionCount: 4,
@@ -23,6 +23,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ if partitionId>=0 && i != partitionId {
+ continue
+ }
tp := broker.TopicPartition{
Namespace: namespace,
Topic: topic,
@@ -66,17 +69,12 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicParti
return
}
- // process init response
- _, err = stream.Recv()
- if err != nil {
- return
- }
return stream, nil
}
-func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
+func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
for {
- resp, listenErr := s.subscriberClients[partition].Recv()
+ resp, listenErr := subscriberClient.Recv()
if listenErr == io.EOF {
return nil
}
@@ -95,6 +93,8 @@ func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.M
// Subscribe starts goroutines to process the messages
func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
for i := 0; i < len(s.subscriberClients); i++ {
- go s.doSubscribe(i, processFn)
+ if s.subscriberClients[i] != nil {
+ go doSubscribe(s.subscriberClients[i], processFn)
+ }
}
}