diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
|---|---|---|
| committer | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
| commit | d861cbd81b75b6684c971ac00e33685e6575b833 (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/messaging/msgclient/subscriber.go | |
| parent | 70da715d8d917527291b35fb069fac077d17b868 (diff) | |
| parent | 4ee58922eff61a5a4ca29c0b4829b097a498549e (diff) | |
| download | seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip | |
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go new file mode 100644 index 000000000..6c7dc1ab7 --- /dev/null +++ b/weed/messaging/msgclient/subscriber.go @@ -0,0 +1,120 @@ +package msgclient + +import ( + "context" + "io" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "google.golang.org/grpc" +) + +type Subscriber struct { + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberCancels []context.CancelFunc + subscriberId string +} + +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + if partitionId >= 0 && i != partitionId { + continue + } + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) + if err != nil { + return nil, err + } + subscriberClients[i] = client + subscriberCancels[i] = cancel + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberCancels: subscriberCancels, + subscriberId: subscriberId, + }, nil +} + +func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) + if err != nil { + return + } + + // send init message + err = stream.Send(&messaging_pb.SubscriberMessage{ + Init: &messaging_pb.SubscriberMessage_InitMessage{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, + StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, + TimestampNs: startTime.UnixNano(), + SubscriberId: subscriberId, + }, + }) + if err != nil { + return + } + + return stream, nil +} + +func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error { + for { + resp, listenErr := subscriberClient.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + println(listenErr.Error()) + return listenErr + } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } + processFn(resp.Data) + } +} + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + var wg sync.WaitGroup + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberClients[i] != nil { + wg.Add(1) + go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) { + defer wg.Done() + doSubscribe(subscriberClient, processFn) + }(s.subscriberClients[i]) + } + } + wg.Wait() +} + +func (s *Subscriber) Shutdown() { + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberCancels[i] != nil { + s.subscriberCancels[i]() + } + } +} |
