diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-18 16:05:29 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-18 16:05:29 -0700 |
| commit | 788578d4ea0314640bfa7e4bcb00503c5d221ba7 (patch) | |
| tree | 207c85f904a2d9b1fd13f7010c7a70611e8e24d0 | |
| parent | 767f14dfcd0ec65a03f3c9029a8080e062de1c24 (diff) | |
| download | seaweedfs-788578d4ea0314640bfa7e4bcb00503c5d221ba7.tar.xz seaweedfs-788578d4ea0314640bfa7e4bcb00503c5d221ba7.zip | |
add subscriber api
| -rw-r--r-- | weed/messaging/client/subscriber.go | 61 |
1 files changed, 56 insertions, 5 deletions
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 55101a283..0b0cf58f9 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -1,14 +1,65 @@ package client -import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +import ( + "context" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) type Subscriber struct { + subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient +} + +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { + stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) + if err != nil { + return nil, err + } + + // send init message + err = stream.Send(&messaging_pb.SubscriberMessage{ + Init: &messaging_pb.SubscriberMessage_InitMessage{ + Namespace: namespace, + Topic: topic, + Partition: 0, + StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, + TimestampNs: time.Now().UnixNano(), + SubscriberId: subscriberId, + }, + }) + if err != nil { + return nil, err + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return nil, err + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + + return &Subscriber{ + subscriberClient: stream, + }, nil } -func (c *MessagingClient) NewSubscriber(namespace, topic string) *Subscriber { - return &Subscriber{} +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error { + for { + resp, listenErr := s.subscriberClient.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + processFn(resp.Data) + } } -func (p *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error{ - return nil +func (s *Subscriber) Shutdown() { + s.subscriberClient.CloseSend() } |
