aboutsummaryrefslogtreecommitdiff
path: root/weed/notification
diff options
context:
space:
mode:
Diffstat (limited to 'weed/notification')
-rw-r--r--weed/notification/aws_sqs/aws_sqs_pub.go14
-rw-r--r--weed/notification/configuration.go9
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go11
-rw-r--r--weed/notification/google_pub_sub/google_pub_sub.go12
-rw-r--r--weed/notification/kafka/kafka_queue.go10
-rw-r--r--weed/notification/log/log_queue.go2
6 files changed, 29 insertions, 29 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go
index 4c1302abb..d881049dd 100644
--- a/weed/notification/aws_sqs/aws_sqs_pub.go
+++ b/weed/notification/aws_sqs/aws_sqs_pub.go
@@ -27,14 +27,14 @@ func (k *AwsSqsPub) GetName() string {
return "aws_sqs"
}
-func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) {
- glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+func (k *AwsSqsPub) Initialize(configuration util.Configuration, prefix string) (err error) {
+ glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("sqs_queue_name"),
+ configuration.GetString(prefix+"aws_access_key_id"),
+ configuration.GetString(prefix+"aws_secret_access_key"),
+ configuration.GetString(prefix+"region"),
+ configuration.GetString(prefix+"sqs_queue_name"),
)
}
diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go
index 7f8765cc3..36211692c 100644
--- a/weed/notification/configuration.go
+++ b/weed/notification/configuration.go
@@ -11,7 +11,7 @@ type MessageQueue interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
// Initialize initializes the file store
- Initialize(configuration util.Configuration) error
+ Initialize(configuration util.Configuration, prefix string) error
SendMessage(key string, message proto.Message) error
}
@@ -21,7 +21,7 @@ var (
Queue MessageQueue
)
-func LoadConfiguration(config *viper.Viper) {
+func LoadConfiguration(config *viper.Viper, prefix string) {
if config == nil {
return
@@ -30,9 +30,8 @@ func LoadConfiguration(config *viper.Viper) {
validateOneEnabledQueue(config)
for _, queue := range MessageQueues {
- if config.GetBool(queue.GetName() + ".enabled") {
- viperSub := config.Sub(queue.GetName())
- if err := queue.Initialize(viperSub); err != nil {
+ if config.GetBool(prefix + queue.GetName() + ".enabled") {
+ if err := queue.Initialize(config, prefix+queue.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize notification for %s: %+v",
queue.GetName(), err)
}
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
index ebf44ea6f..706261b3a 100644
--- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -18,12 +18,13 @@ import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/notification"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/util"
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
@@ -43,8 +44,8 @@ func (k *GoCDKPubSub) GetName() string {
return "gocdk_pub_sub"
}
-func (k *GoCDKPubSub) Initialize(config util.Configuration) error {
- k.topicURL = config.GetString("topic_url")
+func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error {
+ k.topicURL = configuration.GetString(prefix + "topic_url")
glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL)
topic, err := pubsub.OpenTopic(context.Background(), k.topicURL)
if err != nil {
diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go
index 7b26bfe38..363a86eb6 100644
--- a/weed/notification/google_pub_sub/google_pub_sub.go
+++ b/weed/notification/google_pub_sub/google_pub_sub.go
@@ -25,13 +25,13 @@ func (k *GooglePubSub) GetName() string {
return "google_pub_sub"
}
-func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic"))
+func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) {
+ glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetString("google_application_credentials"),
- configuration.GetString("project_id"),
- configuration.GetString("topic"),
+ configuration.GetString(prefix+"google_application_credentials"),
+ configuration.GetString(prefix+"project_id"),
+ configuration.GetString(prefix+"topic"),
)
}
diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go
index fd545722b..8d83b5892 100644
--- a/weed/notification/kafka/kafka_queue.go
+++ b/weed/notification/kafka/kafka_queue.go
@@ -21,12 +21,12 @@ func (k *KafkaQueue) GetName() string {
return "kafka"
}
-func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) {
- glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
- glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) {
+ glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
- configuration.GetStringSlice("hosts"),
- configuration.GetString("topic"),
+ configuration.GetStringSlice(prefix+"hosts"),
+ configuration.GetString(prefix+"topic"),
)
}
diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go
index dcc038dfc..1ca4786a1 100644
--- a/weed/notification/log/log_queue.go
+++ b/weed/notification/log/log_queue.go
@@ -18,7 +18,7 @@ func (k *LogQueue) GetName() string {
return "log"
}
-func (k *LogQueue) Initialize(configuration util.Configuration) (err error) {
+func (k *LogQueue) Initialize(configuration util.Configuration, prefix string) (err error) {
return nil
}