diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-01 22:43:25 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-28 23:22:06 -0700 |
| commit | 21b6b07dd8d0379d835f9d9c1259155a12f1e61b (patch) | |
| tree | c3b13d69cac50afc227b1a06d34082cf3598f98a /weed/mq/msgclient/chan_pub.go | |
| parent | 8c4edf7b4014b157ee269419febe57af9cd67618 (diff) | |
| download | seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.tar.xz seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.zip | |
renaming
Diffstat (limited to 'weed/mq/msgclient/chan_pub.go')
| -rw-r--r-- | weed/mq/msgclient/chan_pub.go | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/weed/mq/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go new file mode 100644 index 000000000..f4ffe832a --- /dev/null +++ b/weed/mq/msgclient/chan_pub.go @@ -0,0 +1,76 @@ +package msgclient + +import ( + "crypto/md5" + "hash" + "io" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" +) + +type PubChannel struct { + client mq_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn + md5hash hash.Hash +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + grpcConnection: grpcConnection, + md5hash: md5.New(), + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + err := pc.client.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.Message{ + Value: m, + }, + }) + if err == nil { + pc.md5hash.Write(m) + } + return err +} +func (pc *PubChannel) Close() error { + + // println("send closing") + if err := pc.client.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil +} + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} |
