diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2020-05-17 17:39:16 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-17 17:39:16 -0700 |
| commit | e0e31e67a809d00c99edaa299531c7ce4d4750dc (patch) | |
| tree | 0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/messaging/msgclient/pub_chan.go | |
| parent | b4e02ec525a6ec87b26686202307896faf3296a7 (diff) | |
| parent | 081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff) | |
| download | seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip | |
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/messaging/msgclient/pub_chan.go')
| -rw-r--r-- | weed/messaging/msgclient/pub_chan.go | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go new file mode 100644 index 000000000..9bc88f7c0 --- /dev/null +++ b/weed/messaging/msgclient/pub_chan.go @@ -0,0 +1,76 @@ +package msgclient + +import ( + "crypto/md5" + "hash" + "io" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type PubChannel struct { + client messaging_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn + md5hash hash.Hash +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + grpcConnection: grpcConnection, + md5hash: md5.New(), + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + Value: m, + }, + }) + if err == nil { + pc.md5hash.Write(m) + } + return err +} +func (pc *PubChannel) Close() error { + + // println("send closing") + if err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil +} + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} |
