aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/subscriber.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-08 02:47:22 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-08 02:47:22 -0700
commitdfccc3c2637693dce141c27a321ba5d3aea1ace9 (patch)
tree9d7bce6ec9f93c563c1086f4b7460279d6527d37 /weed/messaging/msgclient/subscriber.go
parenta8bc8eb351743ffa5032f1c65b8997b4636d67f2 (diff)
downloadseaweedfs-dfccc3c2637693dce141c27a321ba5d3aea1ace9.tar.xz
seaweedfs-dfccc3c2637693dce141c27a321ba5d3aea1ace9.zip
able to read chan and write chan
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
-rw-r--r--weed/messaging/msgclient/subscriber.go12
1 files changed, 6 insertions, 6 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
index 27fa35a5b..d3066d6ef 100644
--- a/weed/messaging/msgclient/subscriber.go
+++ b/weed/messaging/msgclient/subscriber.go
@@ -5,6 +5,7 @@ import (
"io"
"time"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -13,6 +14,7 @@ type Subscriber struct {
subscriberId string
}
+/*
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
// read topic configuration
topicConfiguration := &messaging_pb.TopicConfiguration{
@@ -36,9 +38,9 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
- stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime)
+ stream, err := setupSubscriberClient(subscriberId, namespace, topic, partition, startTime)
if err != nil {
- return client, err
+ return stream, err
}
if newBroker != nil {
@@ -47,6 +49,7 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic
return stream, nil
}
+*/
func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
@@ -70,13 +73,10 @@ func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string,
}
// process init response
- initResponse, err := stream.Recv()
+ _, err = stream.Recv()
if err != nil {
return
}
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
return stream, nil
}