From 07d7abe428186c7771f51589cc397ecefa6453d2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 9 May 2020 00:31:34 -0700 Subject: add deleteTopic, refactoring --- weed/messaging/msgclient/pub_chan.go | 64 ++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 weed/messaging/msgclient/pub_chan.go (limited to 'weed/messaging/msgclient/pub_chan.go') diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go new file mode 100644 index 000000000..ccf301a6a --- /dev/null +++ b/weed/messaging/msgclient/pub_chan.go @@ -0,0 +1,64 @@ +package msgclient + +import ( + "io" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type PubChannel struct { + client messaging_pb.SeaweedMessaging_PublishClient + grpcConnection *grpc.ClientConn +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + grpcConnection: grpcConnection, + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + return pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + Value: m, + }, + }) +} +func (pc *PubChannel) Close() error { + + // println("send closing") + if err := pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + IsClose: true, + }, + }); err != nil { + log.Printf("err send close: %v", err) + } + // println("receive closing") + if _, err := pc.client.Recv(); err != nil && err != io.EOF { + log.Printf("err receive close: %v", err) + } + // println("close connection") + if err := pc.grpcConnection.Close(); err != nil { + log.Printf("err connection close: %v", err) + } + return nil +} -- cgit v1.2.3 From d693e7741852db9f77b542d9e07a3a6620448a83 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 9 May 2020 00:43:53 -0700 Subject: add pub sub md5 --- weed/messaging/msgclient/pub_chan.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) (limited to 'weed/messaging/msgclient/pub_chan.go') diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go index ccf301a6a..9bc88f7c0 100644 --- a/weed/messaging/msgclient/pub_chan.go +++ b/weed/messaging/msgclient/pub_chan.go @@ -1,6 +1,8 @@ package msgclient import ( + "crypto/md5" + "hash" "io" "log" @@ -13,6 +15,7 @@ import ( type PubChannel struct { client messaging_pb.SeaweedMessaging_PublishClient grpcConnection *grpc.ClientConn + md5hash hash.Hash } func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { @@ -32,15 +35,20 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { return &PubChannel{ client: pc, grpcConnection: grpcConnection, + md5hash: md5.New(), }, nil } func (pc *PubChannel) Publish(m []byte) error { - return pc.client.Send(&messaging_pb.PublishRequest{ + err := pc.client.Send(&messaging_pb.PublishRequest{ Data: &messaging_pb.Message{ Value: m, }, }) + if err == nil { + pc.md5hash.Write(m) + } + return err } func (pc *PubChannel) Close() error { @@ -62,3 +70,7 @@ func (pc *PubChannel) Close() error { } return nil } + +func (pc *PubChannel) Md5() []byte { + return pc.md5hash.Sum(nil) +} -- cgit v1.2.3