aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/subscriber.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-05-17 17:39:16 -0700
committerGitHub <noreply@github.com>2020-05-17 17:39:16 -0700
commite0e31e67a809d00c99edaa299531c7ce4d4750dc (patch)
tree0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/messaging/msgclient/subscriber.go
parentb4e02ec525a6ec87b26686202307896faf3296a7 (diff)
parent081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff)
downloadseaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz
seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
-rw-r--r--weed/messaging/msgclient/subscriber.go120
1 files changed, 120 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
new file mode 100644
index 000000000..caa795626
--- /dev/null
+++ b/weed/messaging/msgclient/subscriber.go
@@ -0,0 +1,120 @@
+package msgclient
+
+import (
+ "context"
+ "io"
+ "time"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "google.golang.org/grpc"
+)
+
+type Subscriber struct {
+ subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberCancels []context.CancelFunc
+ subscriberId string
+}
+
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+ subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
+
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ if partitionId>=0 && i != partitionId {
+ continue
+ }
+ tp := broker.TopicPartition{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: int32(i),
+ }
+ grpcClientConn, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
+ if err != nil {
+ return nil, err
+ }
+ subscriberClients[i] = client
+ subscriberCancels[i] = cancel
+ }
+
+ return &Subscriber{
+ subscriberClients: subscriberClients,
+ subscriberCancels: subscriberCancels,
+ subscriberId: subscriberId,
+ }, nil
+}
+
+func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
+ if err != nil {
+ return
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.SubscriberMessage{
+ Init: &messaging_pb.SubscriberMessage_InitMessage{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
+ StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ TimestampNs: startTime.UnixNano(),
+ SubscriberId: subscriberId,
+ },
+ })
+ if err != nil {
+ return
+ }
+
+ return stream, nil
+}
+
+func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
+ for {
+ resp, listenErr := subscriberClient.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ println(listenErr.Error())
+ return listenErr
+ }
+ if resp.Data == nil {
+ // this could be heartbeat from broker
+ continue
+ }
+ processFn(resp.Data)
+ }
+}
+
+// Subscribe starts goroutines to process the messages
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+ var wg sync.WaitGroup
+ for i := 0; i < len(s.subscriberClients); i++ {
+ if s.subscriberClients[i] != nil {
+ wg.Add(1)
+ go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
+ defer wg.Done()
+ doSubscribe(subscriberClient, processFn)
+ }(s.subscriberClients[i])
+ }
+ }
+ wg.Wait()
+}
+
+func (s *Subscriber) Shutdown() {
+ for i := 0; i < len(s.subscriberClients); i++ {
+ if s.subscriberCancels[i] != nil {
+ s.subscriberCancels[i]()
+ }
+ }
+}