aboutsummaryrefslogtreecommitdiff
path: root/weed/notification
diff options
context:
space:
mode:
Diffstat (limited to 'weed/notification')
-rw-r--r--weed/notification/configuration.go2
-rw-r--r--weed/notification/google_pub_sub/google_pub_sub.go89
2 files changed, 90 insertions, 1 deletions
diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go
index e0ba61d58..68c31af77 100644
--- a/weed/notification/configuration.go
+++ b/weed/notification/configuration.go
@@ -35,7 +35,7 @@ func LoadConfiguration(config *viper.Viper) {
store.GetName(), err)
}
Queue = store
- glog.V(0).Infof("Configure message queue for %s", store.GetName())
+ glog.V(0).Infof("Configure notification message queue for %s", store.GetName())
return
}
}
diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go
new file mode 100644
index 000000000..419fb26a4
--- /dev/null
+++ b/weed/notification/google_pub_sub/google_pub_sub.go
@@ -0,0 +1,89 @@
+package google_pub_sub
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/api/option"
+ "context"
+ "cloud.google.com/go/pubsub"
+)
+
+func init() {
+ notification.MessageQueues = append(notification.MessageQueues, &GooglePubSub{})
+}
+
+type GooglePubSub struct {
+ topic *pubsub.Topic
+}
+
+func (k *GooglePubSub) GetName() string {
+ return "google_pub_sub"
+}
+
+func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) {
+ glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id"))
+ glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic"))
+ return k.initialize(
+ configuration.GetString("google_application_credentials"),
+ configuration.GetString("project_id"),
+ configuration.GetString("topic"),
+ )
+}
+
+func (k *GooglePubSub) initialize(google_application_credentials, projectId, topicName string) (err error) {
+
+ ctx := context.Background()
+ // Creates a client.
+ if google_application_credentials == "" {
+ var found bool
+ google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
+ if !found {
+ glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
+ }
+ }
+
+ client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials))
+ if err != nil {
+ glog.Fatalf("Failed to create client: %v", err)
+ }
+
+ k.topic = client.Topic(topicName)
+ if exists, err := k.topic.Exists(ctx); err == nil {
+ if !exists {
+ k.topic, err = client.CreateTopic(ctx, topicName)
+ if err != nil {
+ glog.Fatalf("Failed to create topic %s: %v", topicName, err)
+ }
+ }
+ } else {
+ glog.Fatalf("Failed to check topic %s: %v", topicName, err)
+ }
+
+ return nil
+}
+
+func (k *GooglePubSub) SendMessage(key string, message proto.Message) (err error) {
+
+ bytes, err := proto.Marshal(message)
+ if err != nil {
+ return
+ }
+
+ ctx := context.Background()
+ result := k.topic.Publish(ctx, &pubsub.Message{
+ Data: bytes,
+ Attributes: map[string]string{"key": key},
+ })
+
+ _, err = result.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("send message to google pub sub %s: %v", k.topic.String(), err)
+ }
+
+ return nil
+}