diff options
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/configuration.go | 2 | ||||
| -rw-r--r-- | weed/notification/google_pub_sub/google_pub_sub.go | 89 |
2 files changed, 90 insertions, 1 deletions
diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index e0ba61d58..68c31af77 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -35,7 +35,7 @@ func LoadConfiguration(config *viper.Viper) { store.GetName(), err) } Queue = store - glog.V(0).Infof("Configure message queue for %s", store.GetName()) + glog.V(0).Infof("Configure notification message queue for %s", store.GetName()) return } } diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go new file mode 100644 index 000000000..419fb26a4 --- /dev/null +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -0,0 +1,89 @@ +package google_pub_sub + +import ( + "fmt" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/api/option" + "context" + "cloud.google.com/go/pubsub" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &GooglePubSub{}) +} + +type GooglePubSub struct { + topic *pubsub.Topic +} + +func (k *GooglePubSub) GetName() string { + return "google_pub_sub" +} + +func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) + return k.initialize( + configuration.GetString("google_application_credentials"), + configuration.GetString("project_id"), + configuration.GetString("topic"), + ) +} + +func (k *GooglePubSub) initialize(google_application_credentials, projectId, topicName string) (err error) { + + ctx := context.Background() + // Creates a client. + if google_application_credentials == "" { + var found bool + google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") + if !found { + glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") + } + } + + client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) + if err != nil { + glog.Fatalf("Failed to create client: %v", err) + } + + k.topic = client.Topic(topicName) + if exists, err := k.topic.Exists(ctx); err == nil { + if !exists { + k.topic, err = client.CreateTopic(ctx, topicName) + if err != nil { + glog.Fatalf("Failed to create topic %s: %v", topicName, err) + } + } + } else { + glog.Fatalf("Failed to check topic %s: %v", topicName, err) + } + + return nil +} + +func (k *GooglePubSub) SendMessage(key string, message proto.Message) (err error) { + + bytes, err := proto.Marshal(message) + if err != nil { + return + } + + ctx := context.Background() + result := k.topic.Publish(ctx, &pubsub.Message{ + Data: bytes, + Attributes: map[string]string{"key": key}, + }) + + _, err = result.Get(ctx) + if err != nil { + return fmt.Errorf("send message to google pub sub %s: %v", k.topic.String(), err) + } + + return nil +} |
