aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/chan_sub.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-17 17:42:42 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-17 17:42:42 -0700
commitd3925d086c9ca3f30b5f4c46a92767fe99522ccb (patch)
tree1baee09cb990cc80d7d1dc21cd7482728967b2fb /weed/messaging/msgclient/chan_sub.go
parente0e31e67a809d00c99edaa299531c7ce4d4750dc (diff)
downloadseaweedfs-d3925d086c9ca3f30b5f4c46a92767fe99522ccb.tar.xz
seaweedfs-d3925d086c9ca3f30b5f4c46a92767fe99522ccb.zip
add delete channel
Diffstat (limited to 'weed/messaging/msgclient/chan_sub.go')
-rw-r--r--weed/messaging/msgclient/chan_sub.go85
1 files changed, 85 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/messaging/msgclient/chan_sub.go
new file mode 100644
index 000000000..213ff4666
--- /dev/null
+++ b/weed/messaging/msgclient/chan_sub.go
@@ -0,0 +1,85 @@
+package msgclient
+
+import (
+ "context"
+ "crypto/md5"
+ "hash"
+ "io"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type SubChannel struct {
+ ch chan []byte
+ stream messaging_pb.SeaweedMessaging_SubscribeClient
+ md5hash hash.Hash
+ cancel context.CancelFunc
+}
+
+func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
+ if err != nil {
+ return nil, err
+ }
+
+ t := &SubChannel{
+ ch: make(chan []byte),
+ stream: sc,
+ md5hash: md5.New(),
+ cancel: cancel,
+ }
+
+ go func() {
+ for {
+ resp, subErr := t.stream.Recv()
+ if subErr == io.EOF {
+ return
+ }
+ if subErr != nil {
+ log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
+ return
+ }
+ if resp.Data == nil {
+ // this could be heartbeat from broker
+ continue
+ }
+ if resp.Data.IsClose {
+ t.stream.Send(&messaging_pb.SubscriberMessage{
+ IsClose: true,
+ })
+ close(t.ch)
+ cancel()
+ return
+ }
+ t.ch <- resp.Data.Value
+ t.md5hash.Write(resp.Data.Value)
+ }
+ }()
+
+ return t, nil
+}
+
+func (sc *SubChannel) Channel() chan []byte {
+ return sc.ch
+}
+
+func (sc *SubChannel) Md5() []byte {
+ return sc.md5hash.Sum(nil)
+}
+
+func (sc *SubChannel) Cancel() {
+ sc.cancel()
+}