diff options
Diffstat (limited to 'weed/messaging/client/subscriber.go')
| -rw-r--r-- | weed/messaging/client/subscriber.go | 44 |
1 files changed, 37 insertions, 7 deletions
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 407cd4ac6..ddf1f82e6 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -9,10 +9,33 @@ import ( ) type Subscriber struct { - subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberId string } func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i)) + if err != nil { + return nil, err + } + subscriberClients[i] = client + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberId: subscriberId, + }, nil +} + +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { + stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) if err != nil { return nil, err @@ -23,7 +46,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) Init: &messaging_pb.SubscriberMessage_InitMessage{ Namespace: namespace, Topic: topic, - Partition: 0, + Partition: partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, TimestampNs: time.Now().UnixNano(), SubscriberId: subscriberId, @@ -42,20 +65,27 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) // TODO follow redirection } - return &Subscriber{ - subscriberClient: stream, - }, nil + return stream, nil + } -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error { +func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { for { - resp, listenErr := s.subscriberClient.Recv() + resp, listenErr := s.subscriberClients[partition].Recv() if listenErr == io.EOF { return nil } if listenErr != nil { + println(listenErr.Error()) return listenErr } processFn(resp.Data) } } + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + for i:=0;i<len(s.subscriberClients);i++{ + go s.doSubscribe(i, processFn) + } +} |
