aboutsummaryrefslogtreecommitdiff
path: root/weed/notification
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/notification
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/notification')
-rw-r--r--weed/notification/aws_sqs/aws_sqs_pub.go20
-rw-r--r--weed/notification/configuration.go9
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go71
-rw-r--r--weed/notification/google_pub_sub/google_pub_sub.go12
-rw-r--r--weed/notification/kafka/kafka_queue.go12
-rw-r--r--weed/notification/log/log_queue.go2
6 files changed, 98 insertions, 28 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go
index c1af7f27a..d881049dd 100644
--- a/weed/notification/aws_sqs/aws_sqs_pub.go
+++ b/weed/notification/aws_sqs/aws_sqs_pub.go
@@ -27,24 +27,24 @@ func (k *AwsSqsPub) GetName() string {
return "aws_sqs"
}
-func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) {
- glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+func (k *AwsSqsPub) Initialize(configuration util.Configuration, prefix string) (err error) {
+ glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("sqs_queue_name"),
+ configuration.GetString(prefix+"aws_access_key_id"),
+ configuration.GetString(prefix+"aws_secret_access_key"),
+ configuration.GetString(prefix+"region"),
+ configuration.GetString(prefix+"sqs_queue_name"),
)
}
-func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+func (k *AwsSqsPub) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) {
config := &aws.Config{
Region: aws.String(region),
}
- if awsAccessKeyId != "" && aswSecretAccessKey != "" {
- config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go
index 7f8765cc3..36211692c 100644
--- a/weed/notification/configuration.go
+++ b/weed/notification/configuration.go
@@ -11,7 +11,7 @@ type MessageQueue interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
SendMessage(key string, message proto.Message) error
}
@@ -21,7 +21,7 @@ var (
Queue MessageQueue
)
-func LoadConfiguration(config *viper.Viper) {
+func LoadConfiguration(config *viper.Viper, prefix string) {
if config == nil {
return
@@ -30,9 +30,8 @@ func LoadConfiguration(config *viper.Viper) {
validateOneEnabledQueue(config)
for _, queue := range MessageQueues {
- if config.GetBool(queue.GetName() + ".enabled") {
- viperSub := config.Sub(queue.GetName())
- if err := queue.Initialize(viperSub); err != nil {
+ if config.GetBool(prefix + queue.GetName() + ".enabled") {
+ if err := queue.Initialize(config, prefix+queue.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize notification for %s: %+v",
queue.GetName(), err)
}
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
new file mode 100644
index 000000000..1ae102509
--- /dev/null
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -0,0 +1,71 @@
+// Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API,
+// which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus,
+// Google Cloud PubSub, and RabbitMQ.
+//
+// In the config, select a provider and topic using a URL. See
+// https://godoc.org/gocloud.dev/pubsub and its sub-packages for details.
+//
+// The Go CDK PubSub API does not support administrative operations like topic
+// creation. Create the topic using a UI, CLI or provider-specific API before running
+// weed.
+//
+// The Go CDK obtains credentials via environment variables and other
+// provider-specific default mechanisms. See the provider's documentation for
+// details.
+package gocdk_pub_sub
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+ "gocloud.dev/pubsub"
+ _ "gocloud.dev/pubsub/awssnssqs"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ // _ "gocloud.dev/pubsub/azuresb"
+ _ "gocloud.dev/pubsub/gcppubsub"
+ _ "gocloud.dev/pubsub/natspubsub"
+ _ "gocloud.dev/pubsub/rabbitpubsub"
+)
+
+func init() {
+ notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{})
+}
+
+type GoCDKPubSub struct {
+ topicURL string
+ topic *pubsub.Topic
+}
+
+func (k *GoCDKPubSub) GetName() string {
+ return "gocdk_pub_sub"
+}
+
+func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
+ k.topicURL = configuration.GetString(prefix + "topic_url")
+ glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
+ topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
+ if err != nil {
+ glog.Fatalf("Failed to open topic: %v", err)
+ }
+ k.topic = topic
+ return nil
+}
+
+func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error {
+ bytes, err := proto.Marshal(message)
+ if err != nil {
+ return err
+ }
+ err = k.topic.Send(context.Background(), &pubsub.Message{
+ Body: bytes,
+ Metadata: map[string]string{"key": key},
+ })
+ if err != nil {
+ return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err)
+ }
+ return nil
+}
diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go
index 7b26bfe38..363a86eb6 100644
--- a/weed/notification/google_pub_sub/google_pub_sub.go
+++ b/weed/notification/google_pub_sub/google_pub_sub.go
@@ -25,13 +25,13 @@ func (k *GooglePubSub) GetName() string {
return "google_pub_sub"
}
-func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic"))
+func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) {
+ glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetString("google_application_credentials"),
- configuration.GetString("project_id"),
- configuration.GetString("topic"),
+ configuration.GetString(prefix+"google_application_credentials"),
+ configuration.GetString(prefix+"project_id"),
+ configuration.GetString(prefix+"topic"),
)
}
diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go
index 830709a51..8d83b5892 100644
--- a/weed/notification/kafka/kafka_queue.go
+++ b/weed/notification/kafka/kafka_queue.go
@@ -21,12 +21,12 @@ func (k *KafkaQueue) GetName() string {
return "kafka"
}
-func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
- glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
- glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) {
+ glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetStringSlice("hosts"),
- configuration.GetString("topic"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ configuration.GetString(prefix+"topic"),
)
}
@@ -76,7 +76,7 @@ func (k *KafkaQueue) handleError() {
for {
err := <-k.producer.Errors()
if err != nil {
- glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
+ glog.Errorf("producer message error, partition:%d offset:%d key:%v value:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
}
}
}
diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go
index dcc038dfc..1ca4786a1 100644
--- a/weed/notification/log/log_queue.go
+++ b/weed/notification/log/log_queue.go
@@ -18,7 +18,7 @@ func (k *LogQueue) GetName() string {
return "log"
}
-func (k *LogQueue) Initialize(configuration util.Configuration) (err error) {
+func (k *LogQueue) Initialize(configuration util.Configuration, prefix string) (err error) {
return nil
}