aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-09 00:43:53 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-09 00:43:53 -0700
commitd693e7741852db9f77b542d9e07a3a6620448a83 (patch)
treebeffb106ea781333d7cc48f87deb8d6334ac3fd1
parent07d7abe428186c7771f51589cc397ecefa6453d2 (diff)
downloadseaweedfs-d693e7741852db9f77b542d9e07a3a6620448a83.tar.xz
seaweedfs-d693e7741852db9f77b542d9e07a3a6620448a83.zip
add pub sub md5
-rw-r--r--weed/messaging/broker/broker_grpc_server.go2
-rw-r--r--weed/messaging/msgclient/pub_chan.go14
-rw-r--r--weed/messaging/msgclient/sub_chan.go17
3 files changed, 27 insertions, 6 deletions
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
index 305213622..6918a28a6 100644
--- a/weed/messaging/broker/broker_grpc_server.go
+++ b/weed/messaging/broker/broker_grpc_server.go
@@ -33,5 +33,5 @@ func genTopicDir(namespace, topic string) string {
}
func genTopicDirEntry(namespace, topic string) (dir, entry string) {
- return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace), topic
+ return fmt.Sprintf("%s/%s", filer2.TopicsDir, namespace), topic
}
diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go
index ccf301a6a..9bc88f7c0 100644
--- a/weed/messaging/msgclient/pub_chan.go
+++ b/weed/messaging/msgclient/pub_chan.go
@@ -1,6 +1,8 @@
package msgclient
import (
+ "crypto/md5"
+ "hash"
"io"
"log"
@@ -13,6 +15,7 @@ import (
type PubChannel struct {
client messaging_pb.SeaweedMessaging_PublishClient
grpcConnection *grpc.ClientConn
+ md5hash hash.Hash
}
func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
@@ -32,15 +35,20 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
return &PubChannel{
client: pc,
grpcConnection: grpcConnection,
+ md5hash: md5.New(),
}, nil
}
func (pc *PubChannel) Publish(m []byte) error {
- return pc.client.Send(&messaging_pb.PublishRequest{
+ err := pc.client.Send(&messaging_pb.PublishRequest{
Data: &messaging_pb.Message{
Value: m,
},
})
+ if err == nil {
+ pc.md5hash.Write(m)
+ }
+ return err
}
func (pc *PubChannel) Close() error {
@@ -62,3 +70,7 @@ func (pc *PubChannel) Close() error {
}
return nil
}
+
+func (pc *PubChannel) Md5() []byte {
+ return pc.md5hash.Sum(nil)
+}
diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go
index edd4d1049..aae5c0c71 100644
--- a/weed/messaging/msgclient/sub_chan.go
+++ b/weed/messaging/msgclient/sub_chan.go
@@ -1,6 +1,8 @@
package msgclient
import (
+ "crypto/md5"
+ "hash"
"io"
"log"
"time"
@@ -10,8 +12,9 @@ import (
)
type SubChannel struct {
- ch chan []byte
- stream messaging_pb.SeaweedMessaging_SubscribeClient
+ ch chan []byte
+ stream messaging_pb.SeaweedMessaging_SubscribeClient
+ md5hash hash.Hash
}
func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
@@ -30,8 +33,9 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
}
t := &SubChannel{
- ch: make(chan []byte),
- stream: sc,
+ ch: make(chan []byte),
+ stream: sc,
+ md5hash: md5.New(),
}
go func() {
@@ -51,6 +55,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
close(t.ch)
return
}
+ t.md5hash.Write(resp.Data.Value)
t.ch <- resp.Data.Value
}
}()
@@ -61,3 +66,7 @@ func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
func (sc *SubChannel) Channel() chan []byte {
return sc.ch
}
+
+func (sc *SubChannel) Md5() []byte {
+ return sc.md5hash.Sum(nil)
+}