diff options
Diffstat (limited to 'weed/mq/msgclient/chan_pub.go')
| -rw-r--r-- | weed/mq/msgclient/chan_pub.go | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/weed/mq/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go new file mode 100644 index 000000000..f4ffe832a --- /dev/null +++ b/weed/mq/msgclient/chan_pub.go @@ -0,0 +1,76 @@ +package msgclient + +import ( + "crypto/md5" + "hash" + "io" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" +) + +type PubChannel struct { + client mq_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn + md5hash hash.Hash +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + grpcConnection: grpcConnection, + md5hash: md5.New(), + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + err := pc.client.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.Message{ + Value: m, + }, + }) + if err == nil { + pc.md5hash.Write(m) + } + return err +} +func (pc *PubChannel) Close() error { + + // println("send closing") + if err := pc.client.Send(&mq_pb.PublishRequest{ + Data: &mq_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil +} + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} |
