aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2019-03-19 10:10:43 -0400
committerJonathan Amsterdam <jba@google.com>2019-03-20 07:57:58 -0400
commit8db82e2b75eb67bc58a4869cc70fb8c5b2c0b97c (patch)
tree3244727ad476960479357208cc7e38735a06c993
parent44647a46c0dc82350cc994da0784f3c3936270d6 (diff)
downloadseaweedfs-8db82e2b75eb67bc58a4869cc70fb8c5b2c0b97c.tar.xz
seaweedfs-8db82e2b75eb67bc58a4869cc70fb8c5b2c0b97c.zip
notification: add Go CDK pubsub support
Add the gocdk_pub_sub package, which supports the Go Cloud Development Kit pubsub API. Link in all current providers. Update the notification scaffold.
-rw-r--r--weed/command/scaffold.go10
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go71
-rw-r--r--weed/server/filer_server.go4
3 files changed, 84 insertions, 1 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 9e45d7381..d72bd6f2f 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -180,6 +180,16 @@ google_application_credentials = "/path/to/x.json" # path to json credential fil
project_id = "" # an existing project id
topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
+[notification.gocdk_pub_sub]
+# The Go Cloud Development Kit (https://gocloud.dev).
+# PubSub API (https://godoc.org/gocloud.dev/pubsub).
+# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
+enabled = false
+# This URL will Dial the RabbitMQ server at the URL in the environment
+# variable RABBIT_SERVER_URL and open the exchange "myexchange".
+# The exchange must have already been created by some other means, like
+# the RabbitMQ management plugin.
+topic_url = "rabbit://myexchange"
`
REPLICATION_TOML_EXAMPLE = `
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
new file mode 100644
index 000000000..94a413ac0
--- /dev/null
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -0,0 +1,71 @@
+// Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API,
+// which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus,
+// Google Cloud PubSub, and RabbitMQ.
+//
+// In the config, select a provider and topic using a URL. See
+// https://godoc.org/gocloud.dev/pubsub and its sub-packages for details.
+//
+// The Go CDK PubSub API does not support administrative operations like topic
+// creation. Create the topic using a UI, CLI or provider-specific API before running
+// weed.
+//
+// The Go CDK obtains credentials via environment variables and other
+// provider-specific default mechanisms. See the provider's documentation for
+// details.
+package gocdk_pub_sub
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "gocloud.dev/pubsub"
+ _ "gocloud.dev/pubsub/awssnssqs"
+ _ "gocloud.dev/pubsub/azuresb"
+ _ "gocloud.dev/pubsub/gcppubsub"
+ _ "gocloud.dev/pubsub/natspubsub"
+ _ "gocloud.dev/pubsub/rabbitpubsub"
+)
+
+func init() {
+ notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
+}
+
+type GoCDKPubSub struct {
+ topicURL string
+ topic *pubsub.Topic
+}
+
+func (k *GoCDKPubSub) GetName() string {
+ return "gocdk_pub_sub"
+}
+
+func (k *GoCDKPubSub) Initialize(config util.Configuration) error {
+ k.topicURL = config.GetString("topic_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
+ topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
+ if err != nil {
+ glog.Fatalf("Failed to open topic: %v", err)
+ }
+ k.topic = topic
+ return nil
+}
+
+func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
+ bytes, err := proto.Marshal(message)
+ if err != nil {
+ return err
+ }
+ ctx := context.Background()
+ err = k.topic.Send(ctx, &pubsub.Message{
+ Body: bytes,
+ Metadata: map[string]string{"key": key},
+ })
+ if err != nil {
+ return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err)
+ }
+ return nil
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 83998a009..43d319398 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,10 +1,11 @@
package weed_server
import (
- "google.golang.org/grpc"
"net/http"
"os"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
@@ -15,6 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"