diff options
Diffstat (limited to 'weed/messaging/msgclient')
| -rw-r--r-- | weed/messaging/msgclient/pub_chan.go (renamed from weed/messaging/msgclient/pub_sub_chan.go) | 54 | ||||
| -rw-r--r-- | weed/messaging/msgclient/sub_chan.go | 63 |
2 files changed, 63 insertions, 54 deletions
diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_chan.go index a11240080..ccf301a6a 100644 --- a/weed/messaging/msgclient/pub_sub_chan.go +++ b/weed/messaging/msgclient/pub_chan.go @@ -3,7 +3,6 @@ package msgclient import ( "io" "log" - "time" "google.golang.org/grpc" @@ -63,56 +62,3 @@ func (pc *PubChannel) Close() error { } return nil } - -type SubChannel struct { - ch chan []byte - stream messaging_pb.SeaweedMessaging_SubscribeClient -} - -func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { - tp := broker.TopicPartition{ - Namespace: "chan", - Topic: chanName, - Partition: 0, - } - grpcConnection, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0, 0)) - if err != nil { - return nil, err - } - - t := &SubChannel{ - ch: make(chan []byte), - stream: sc, - } - - go func() { - for { - resp, subErr := t.stream.Recv() - if subErr == io.EOF { - return - } - if subErr != nil { - log.Printf("fail to receive from netchan %s: %v", chanName, subErr) - return - } - if resp.Data.IsClose { - t.stream.Send(&messaging_pb.SubscriberMessage{ - IsClose: true, - }) - close(t.ch) - return - } - t.ch <- resp.Data.Value - } - }() - - return t, nil -} - -func (sc *SubChannel) Channel() chan []byte { - return sc.ch -} diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go new file mode 100644 index 000000000..edd4d1049 --- /dev/null +++ b/weed/messaging/msgclient/sub_chan.go @@ -0,0 +1,63 @@ +package msgclient + +import ( + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient +} + +func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0, 0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.Data.IsClose { + t.stream.Send(&messaging_pb.SubscriberMessage{ + IsClose: true, + }) + close(t.ch) + return + } + t.ch <- resp.Data.Value + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +} |
