diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-17 17:42:42 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-17 17:42:42 -0700 |
| commit | d3925d086c9ca3f30b5f4c46a92767fe99522ccb (patch) | |
| tree | 1baee09cb990cc80d7d1dc21cd7482728967b2fb /weed/messaging/msgclient/chan_sub.go | |
| parent | e0e31e67a809d00c99edaa299531c7ce4d4750dc (diff) | |
| download | seaweedfs-d3925d086c9ca3f30b5f4c46a92767fe99522ccb.tar.xz seaweedfs-d3925d086c9ca3f30b5f4c46a92767fe99522ccb.zip | |
add delete channel
Diffstat (limited to 'weed/messaging/msgclient/chan_sub.go')
| -rw-r--r-- | weed/messaging/msgclient/chan_sub.go | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/messaging/msgclient/chan_sub.go new file mode 100644 index 000000000..213ff4666 --- /dev/null +++ b/weed/messaging/msgclient/chan_sub.go @@ -0,0 +1,85 @@ +package msgclient + +import ( + "context" + "crypto/md5" + "hash" + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient + md5hash hash.Hash + cancel context.CancelFunc +} + +func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + md5hash: md5.New(), + cancel: cancel, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.Data == nil { + // this could be heartbeat from broker + continue + } + if resp.Data.IsClose { + t.stream.Send(&messaging_pb.SubscriberMessage{ + IsClose: true, + }) + close(t.ch) + cancel() + return + } + t.ch <- resp.Data.Value + t.md5hash.Write(resp.Data.Value) + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +} + +func (sc *SubChannel) Md5() []byte { + return sc.md5hash.Sum(nil) +} + +func (sc *SubChannel) Cancel() { + sc.cancel() +} |
