diff options
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/aws_sqs/aws_sqs_pub.go | 20 | ||||
| -rw-r--r-- | weed/notification/configuration.go | 9 | ||||
| -rw-r--r-- | weed/notification/gocdk_pub_sub/gocdk_pub_sub.go | 71 | ||||
| -rw-r--r-- | weed/notification/google_pub_sub/google_pub_sub.go | 12 | ||||
| -rw-r--r-- | weed/notification/kafka/kafka_queue.go | 12 | ||||
| -rw-r--r-- | weed/notification/log/log_queue.go | 2 |
6 files changed, 98 insertions, 28 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go index c1af7f27a..d881049dd 100644 --- a/weed/notification/aws_sqs/aws_sqs_pub.go +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -27,24 +27,24 @@ func (k *AwsSqsPub) GetName() string { return "aws_sqs" } -func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsPub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } -func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { +func (k *AwsSqsPub) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) { config := &aws.Config{ Region: aws.String(region), } - if awsAccessKeyId != "" && aswSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + if awsAccessKeyId != "" && awsSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") } sess, err := session.NewSession(config) diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index 7f8765cc3..36211692c 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -11,7 +11,7 @@ type MessageQueue interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error SendMessage(key string, message proto.Message) error } @@ -21,7 +21,7 @@ var ( Queue MessageQueue ) -func LoadConfiguration(config *viper.Viper) { +func LoadConfiguration(config *viper.Viper, prefix string) { if config == nil { return @@ -30,9 +30,8 @@ func LoadConfiguration(config *viper.Viper) { validateOneEnabledQueue(config) for _, queue := range MessageQueues { - if config.GetBool(queue.GetName() + ".enabled") { - viperSub := config.Sub(queue.GetName()) - if err := queue.Initialize(viperSub); err != nil { + if config.GetBool(prefix + queue.GetName() + ".enabled") { + if err := queue.Initialize(config, prefix+queue.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize notification for %s: %+v", queue.GetName(), err) } diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go new file mode 100644 index 000000000..1ae102509 --- /dev/null +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -0,0 +1,71 @@ +// Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API, +// which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus, +// Google Cloud PubSub, and RabbitMQ. +// +// In the config, select a provider and topic using a URL. See +// https://godoc.org/gocloud.dev/pubsub and its sub-packages for details. +// +// The Go CDK PubSub API does not support administrative operations like topic +// creation. Create the topic using a UI, CLI or provider-specific API before running +// weed. +// +// The Go CDK obtains credentials via environment variables and other +// provider-specific default mechanisms. See the provider's documentation for +// details. +package gocdk_pub_sub + +import ( + "context" + "fmt" + + "github.com/golang/protobuf/proto" + "gocloud.dev/pubsub" + _ "gocloud.dev/pubsub/awssnssqs" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + // _ "gocloud.dev/pubsub/azuresb" + _ "gocloud.dev/pubsub/gcppubsub" + _ "gocloud.dev/pubsub/natspubsub" + _ "gocloud.dev/pubsub/rabbitpubsub" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{}) +} + +type GoCDKPubSub struct { + topicURL string + topic *pubsub.Topic +} + +func (k *GoCDKPubSub) GetName() string { + return "gocdk_pub_sub" +} + +func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error { + k.topicURL = configuration.GetString(prefix + "topic_url") + glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) + topic, err := pubsub.OpenTopic(context.Background(), k.topicURL) + if err != nil { + glog.Fatalf("Failed to open topic: %v", err) + } + k.topic = topic + return nil +} + +func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error { + bytes, err := proto.Marshal(message) + if err != nil { + return err + } + err = k.topic.Send(context.Background(), &pubsub.Message{ + Body: bytes, + Metadata: map[string]string{"key": key}, + }) + if err != nil { + return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err) + } + return nil +} diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go index 7b26bfe38..363a86eb6 100644 --- a/weed/notification/google_pub_sub/google_pub_sub.go +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -25,13 +25,13 @@ func (k *GooglePubSub) GetName() string { return "google_pub_sub" } -func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go index 830709a51..8d83b5892 100644 --- a/weed/notification/kafka/kafka_queue.go +++ b/weed/notification/kafka/kafka_queue.go @@ -21,12 +21,12 @@ func (k *KafkaQueue) GetName() string { return "kafka" } -func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), ) } @@ -76,7 +76,7 @@ func (k *KafkaQueue) handleError() { for { err := <-k.producer.Errors() if err != nil { - glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic) + glog.Errorf("producer message error, partition:%d offset:%d key:%v value:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic) } } } diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go index dcc038dfc..1ca4786a1 100644 --- a/weed/notification/log/log_queue.go +++ b/weed/notification/log/log_queue.go @@ -18,7 +18,7 @@ func (k *LogQueue) GetName() string { return "log" } -func (k *LogQueue) Initialize(configuration util.Configuration) (err error) { +func (k *LogQueue) Initialize(configuration util.Configuration, prefix string) (err error) { return nil } |
