aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2019-04-08 10:31:02 -0700
committerGitHub <noreply@github.com>2019-04-08 10:31:02 -0700
commita41ba791191fd5188c044fe540ae21550fe965c0 (patch)
tree672a369b0ea4434e21cc01e19ca43ee31f0c095e
parentd14b614407a96e3d81bc655e5164d202c7e3b959 (diff)
parent72920efc20fd758048f21a82d621a6d7bbc08066 (diff)
downloadseaweedfs-a41ba791191fd5188c044fe540ae21550fe965c0.tar.xz
seaweedfs-a41ba791191fd5188c044fe540ae21550fe965c0.zip
Merge pull request #914 from jba/gocdk-receive
replication: add GoCDK PubSub support
-rw-r--r--weed/command/scaffold.go1
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go50
2 files changed, 51 insertions, 0 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index b21641f6b..106c2dace 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -190,6 +190,7 @@ enabled = false
# The exchange must have already been created by some other means, like
# the RabbitMQ management plugin.
topic_url = "rabbit://myexchange"
+sub_url = "rabbit://myqueue"
`
REPLICATION_TOML_EXAMPLE = `
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
new file mode 100644
index 000000000..9c76e6918
--- /dev/null
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -0,0 +1,50 @@
+package sub
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "gocloud.dev/pubsub"
+ _ "gocloud.dev/pubsub/awssnssqs"
+ _ "gocloud.dev/pubsub/azuresb"
+ _ "gocloud.dev/pubsub/gcppubsub"
+ _ "gocloud.dev/pubsub/natspubsub"
+ _ "gocloud.dev/pubsub/rabbitpubsub"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
+}
+
+type GoCDKPubSubInput struct {
+ sub *pubsub.Subscription
+}
+
+func (k *GoCDKPubSubInput) GetName() string {
+ return "gocdk_pub_sub"
+}
+
+func (k *GoCDKPubSubInput) Initialize(config util.Configuration) error {
+ subURL := config.GetString("sub_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
+ sub, err := pubsub.OpenSubscription(context.Background(), subURL)
+ if err != nil {
+ return err
+ }
+ k.sub = sub
+ return nil
+}
+
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+ msg, err := k.sub.Receive(context.Background())
+ key = msg.Metadata["key"]
+ message = &filer_pb.EventNotification{}
+ err = proto.Unmarshal(msg.Body, message)
+ if err != nil {
+ return "", nil, err
+ }
+ return key, message, nil
+}