diff options
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/aws_sqs/aws_sqs_pub.go | 14 | ||||
| -rw-r--r-- | weed/notification/configuration.go | 9 | ||||
| -rw-r--r-- | weed/notification/gocdk_pub_sub/gocdk_pub_sub.go | 11 | ||||
| -rw-r--r-- | weed/notification/google_pub_sub/google_pub_sub.go | 12 | ||||
| -rw-r--r-- | weed/notification/kafka/kafka_queue.go | 10 | ||||
| -rw-r--r-- | weed/notification/log/log_queue.go | 2 |
6 files changed, 29 insertions, 29 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go index 4c1302abb..d881049dd 100644 --- a/weed/notification/aws_sqs/aws_sqs_pub.go +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -27,14 +27,14 @@ func (k *AwsSqsPub) GetName() string { return "aws_sqs" } -func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsPub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index 7f8765cc3..36211692c 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -11,7 +11,7 @@ type MessageQueue interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error SendMessage(key string, message proto.Message) error } @@ -21,7 +21,7 @@ var ( Queue MessageQueue ) -func LoadConfiguration(config *viper.Viper) { +func LoadConfiguration(config *viper.Viper, prefix string) { if config == nil { return @@ -30,9 +30,8 @@ func LoadConfiguration(config *viper.Viper) { validateOneEnabledQueue(config) for _, queue := range MessageQueues { - if config.GetBool(queue.GetName() + ".enabled") { - viperSub := config.Sub(queue.GetName()) - if err := queue.Initialize(viperSub); err != nil { + if config.GetBool(prefix + queue.GetName() + ".enabled") { + if err := queue.Initialize(config, prefix+queue.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize notification for %s: %+v", queue.GetName(), err) } diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index ebf44ea6f..706261b3a 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -18,12 +18,13 @@ import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/notification" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "gocloud.dev/pubsub" _ "gocloud.dev/pubsub/awssnssqs" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" @@ -43,8 +44,8 @@ func (k *GoCDKPubSub) GetName() string { return "gocdk_pub_sub" } -func (k *GoCDKPubSub) Initialize(config util.Configuration) error { - k.topicURL = config.GetString("topic_url") +func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error { + k.topicURL = configuration.GetString(prefix + "topic_url") glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) topic, err := pubsub.OpenTopic(context.Background(), k.topicURL) if err != nil { diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go index 7b26bfe38..363a86eb6 100644 --- a/weed/notification/google_pub_sub/google_pub_sub.go +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -25,13 +25,13 @@ func (k *GooglePubSub) GetName() string { return "google_pub_sub" } -func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go index fd545722b..8d83b5892 100644 --- a/weed/notification/kafka/kafka_queue.go +++ b/weed/notification/kafka/kafka_queue.go @@ -21,12 +21,12 @@ func (k *KafkaQueue) GetName() string { return "kafka" } -func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go index dcc038dfc..1ca4786a1 100644 --- a/weed/notification/log/log_queue.go +++ b/weed/notification/log/log_queue.go @@ -18,7 +18,7 @@ func (k *LogQueue) GetName() string { return "log" } -func (k *LogQueue) Initialize(configuration util.Configuration) (err error) { +func (k *LogQueue) Initialize(configuration util.Configuration, prefix string) (err error) { return nil } |
