aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/client/subscriber.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-21 00:59:55 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-21 00:59:55 -0700
commit5c348087dc0dd9a67250f80839908b22000a5591 (patch)
treef1263b7db25b700d6616f76ea6cccdfe30710218 /weed/messaging/client/subscriber.go
parentcb3985be70c2b6eb9a0e00a04a6a02f8ebd650d5 (diff)
downloadseaweedfs-5c348087dc0dd9a67250f80839908b22000a5591.tar.xz
seaweedfs-5c348087dc0dd9a67250f80839908b22000a5591.zip
messaging: able to pub sub multiple partitions
Diffstat (limited to 'weed/messaging/client/subscriber.go')
-rw-r--r--weed/messaging/client/subscriber.go44
1 files changed, 37 insertions, 7 deletions
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go
index 407cd4ac6..ddf1f82e6 100644
--- a/weed/messaging/client/subscriber.go
+++ b/weed/messaging/client/subscriber.go
@@ -9,10 +9,33 @@ import (
)
type Subscriber struct {
- subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberId string
}
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i))
+ if err != nil {
+ return nil, err
+ }
+ subscriberClients[i] = client
+ }
+
+ return &Subscriber{
+ subscriberClients: subscriberClients,
+ subscriberId: subscriberId,
+ }, nil
+}
+
+func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
+
stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
if err != nil {
return nil, err
@@ -23,7 +46,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string)
Init: &messaging_pb.SubscriberMessage_InitMessage{
Namespace: namespace,
Topic: topic,
- Partition: 0,
+ Partition: partition,
StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
TimestampNs: time.Now().UnixNano(),
SubscriberId: subscriberId,
@@ -42,20 +65,27 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string)
// TODO follow redirection
}
- return &Subscriber{
- subscriberClient: stream,
- }, nil
+ return stream, nil
+
}
-func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error {
+func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
for {
- resp, listenErr := s.subscriberClient.Recv()
+ resp, listenErr := s.subscriberClients[partition].Recv()
if listenErr == io.EOF {
return nil
}
if listenErr != nil {
+ println(listenErr.Error())
return listenErr
}
processFn(resp.Data)
}
}
+
+// Subscribe starts goroutines to process the messages
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+ for i:=0;i<len(s.subscriberClients);i++{
+ go s.doSubscribe(i, processFn)
+ }
+}