aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/subscriber.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-05 02:05:28 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-05 02:05:28 -0700
commit1e3e4b3072071341b4bb4b0bb7c611457e927f97 (patch)
treed32efe43fdc34d203393d45d8343d54eaa19f180 /weed/messaging/msgclient/subscriber.go
parent47234760f40e4d2cea87b4a83d2178b8181598f5 (diff)
downloadseaweedfs-1e3e4b3072071341b4bb4b0bb7c611457e927f97.tar.xz
seaweedfs-1e3e4b3072071341b4bb4b0bb7c611457e927f97.zip
add broker connects to filer
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
-rw-r--r--weed/messaging/msgclient/subscriber.go102
1 files changed, 102 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
new file mode 100644
index 000000000..27fa35a5b
--- /dev/null
+++ b/weed/messaging/msgclient/subscriber.go
@@ -0,0 +1,102 @@
+package msgclient
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type Subscriber struct {
+ subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberId string
+}
+
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
+ if err != nil {
+ return nil, err
+ }
+ subscriberClients[i] = client
+ }
+
+ return &Subscriber{
+ subscriberClients: subscriberClients,
+ subscriberId: subscriberId,
+ }, nil
+}
+
+func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
+
+ stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime)
+ if err != nil {
+ return client, err
+ }
+ if newBroker != nil {
+
+ }
+
+ return stream, nil
+
+}
+
+func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
+ if err != nil {
+ return
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.SubscriberMessage{
+ Init: &messaging_pb.SubscriberMessage_InitMessage{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: partition,
+ StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ TimestampNs: startTime.UnixNano(),
+ SubscriberId: subscriberId,
+ },
+ })
+ if err != nil {
+ return
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ return stream, nil
+}
+
+func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
+ for {
+ resp, listenErr := s.subscriberClients[partition].Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ println(listenErr.Error())
+ return listenErr
+ }
+ processFn(resp.Data)
+ }
+}
+
+// Subscribe starts goroutines to process the messages
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+ for i := 0; i < len(s.subscriberClients); i++ {
+ go s.doSubscribe(i, processFn)
+ }
+}