aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/pub_chan.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-05-17 17:39:16 -0700
committerGitHub <noreply@github.com>2020-05-17 17:39:16 -0700
commite0e31e67a809d00c99edaa299531c7ce4d4750dc (patch)
tree0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/messaging/msgclient/pub_chan.go
parentb4e02ec525a6ec87b26686202307896faf3296a7 (diff)
parent081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff)
downloadseaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz
seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/messaging/msgclient/pub_chan.go')
-rw-r--r--weed/messaging/msgclient/pub_chan.go76
1 files changed, 76 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go
new file mode 100644
index 000000000..9bc88f7c0
--- /dev/null
+++ b/weed/messaging/msgclient/pub_chan.go
@@ -0,0 +1,76 @@
+package msgclient
+
+import (
+ "crypto/md5"
+ "hash"
+ "io"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type PubChannel struct {
+ client messaging_pb.SeaweedMessaging_PublishClient
+ grpcConnection *grpc.ClientConn
+ md5hash hash.Hash
+}
+
+func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ pc, err := setupPublisherClient(grpcConnection, tp)
+ if err != nil {
+ return nil, err
+ }
+ return &PubChannel{
+ client: pc,
+ grpcConnection: grpcConnection,
+ md5hash: md5.New(),
+ }, nil
+}
+
+func (pc *PubChannel) Publish(m []byte) error {
+ err := pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ Value: m,
+ },
+ })
+ if err == nil {
+ pc.md5hash.Write(m)
+ }
+ return err
+}
+func (pc *PubChannel) Close() error {
+
+ // println("send closing")
+ if err := pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ IsClose: true,
+ },
+ }); err != nil {
+ log.Printf("err send close: %v", err)
+ }
+ // println("receive closing")
+ if _, err := pc.client.Recv(); err != nil && err != io.EOF {
+ log.Printf("err receive close: %v", err)
+ }
+ // println("close connection")
+ if err := pc.grpcConnection.Close(); err != nil {
+ log.Printf("err connection close: %v", err)
+ }
+ return nil
+}
+
+func (pc *PubChannel) Md5() []byte {
+ return pc.md5hash.Sum(nil)
+}