aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2019-04-04 17:22:45 -0400
committerJonathan Amsterdam <jba@google.com>2019-04-04 17:22:45 -0400
commitc0c9a8bad51d448042a1251c5daa73aecf79109a (patch)
tree06c21a6db2dc9a2f1a273a9d64d5b0691c442283
parent715a38da1e4fce05631f230ccf09ce92c99a4fd4 (diff)
downloadseaweedfs-c0c9a8bad51d448042a1251c5daa73aecf79109a.tar.xz
seaweedfs-c0c9a8bad51d448042a1251c5daa73aecf79109a.zip
replication: add GoCDK PubSub support
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go50
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
new file mode 100644
index 000000000..c8b16e308
--- /dev/null
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -0,0 +1,50 @@
+package sub
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "gocloud.dev/pubsub"
+ _ "gocloud.dev/pubsub/awssnssqs"
+ _ "gocloud.dev/pubsub/azuresb"
+ _ "gocloud.dev/pubsub/gcppubsub"
+ _ "gocloud.dev/pubsub/natspubsub"
+ _ "gocloud.dev/pubsub/rabbitpubsub"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
+}
+
+type GoCDKPubSubInput struct {
+ sub *pubsub.Subscription
+}
+
+func (k *GoCDKPubSubInput) GetName() string {
+ return "gocdk_pub_sub"
+}
+
+func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error {
+ subURL := config.GetString("sub_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", subURL)
+ sub, err := pubsub.OpenSubscription(context.Background(), subURL)
+ if err != nil {
+ return err
+ }
+ k.sub = sub
+ return nil
+}
+
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+ msg, err := k.sub.Receive(context.Background())
+ key = msg.Metadata["key"]
+ message = &filer_pb.EventNotification{}
+ err = proto.Unmarshal(msg.Body, message)
+ if err != nil {
+ return "", nil, err
+ }
+ return key, message, nil
+}