aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/messaging/client/subscriber.go61
1 files changed, 56 insertions, 5 deletions
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go
index 55101a283..0b0cf58f9 100644
--- a/weed/messaging/client/subscriber.go
+++ b/weed/messaging/client/subscriber.go
@@ -1,14 +1,65 @@
package client
-import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+import (
+ "context"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
type Subscriber struct {
+ subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient
+}
+
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) {
+ stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.SubscriberMessage{
+ Init: &messaging_pb.SubscriberMessage_InitMessage{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: 0,
+ StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ TimestampNs: time.Now().UnixNano(),
+ SubscriberId: subscriberId,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+
+ return &Subscriber{
+ subscriberClient: stream,
+ }, nil
}
-func (c *MessagingClient) NewSubscriber(namespace, topic string) *Subscriber {
- return &Subscriber{}
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error {
+ for {
+ resp, listenErr := s.subscriberClient.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ processFn(resp.Data)
+ }
}
-func (p *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error{
- return nil
+func (s *Subscriber) Shutdown() {
+ s.subscriberClient.CloseSend()
}