diff options
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/aws_sqs/aws_sqs_pub.go | 14 | ||||
| -rw-r--r-- | weed/notification/configuration.go | 12 | ||||
| -rw-r--r-- | weed/notification/gocdk_pub_sub/gocdk_pub_sub.go | 47 | ||||
| -rw-r--r-- | weed/notification/google_pub_sub/google_pub_sub.go | 12 | ||||
| -rw-r--r-- | weed/notification/kafka/kafka_queue.go | 10 | ||||
| -rw-r--r-- | weed/notification/log/log_queue.go | 2 |
6 files changed, 64 insertions, 33 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go index 4c1302abb..d881049dd 100644 --- a/weed/notification/aws_sqs/aws_sqs_pub.go +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -27,14 +27,14 @@ func (k *AwsSqsPub) GetName() string { return "aws_sqs" } -func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) - glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) +func (k *AwsSqsPub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("sqs_queue_name"), + configuration.GetString(prefix+"aws_access_key_id"), + configuration.GetString(prefix+"aws_secret_access_key"), + configuration.GetString(prefix+"region"), + configuration.GetString(prefix+"sqs_queue_name"), ) } diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go index 7f8765cc3..541a453e9 100644 --- a/weed/notification/configuration.go +++ b/weed/notification/configuration.go @@ -4,14 +4,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" - "github.com/spf13/viper" ) type MessageQueue interface { // GetName gets the name to locate the configuration in filer.toml file GetName() string // Initialize initializes the file store - Initialize(configuration util.Configuration) error + Initialize(configuration util.Configuration, prefix string) error SendMessage(key string, message proto.Message) error } @@ -21,7 +20,7 @@ var ( Queue MessageQueue ) -func LoadConfiguration(config *viper.Viper) { +func LoadConfiguration(config *util.ViperProxy, prefix string) { if config == nil { return @@ -30,9 +29,8 @@ func LoadConfiguration(config *viper.Viper) { validateOneEnabledQueue(config) for _, queue := range MessageQueues { - if config.GetBool(queue.GetName() + ".enabled") { - viperSub := config.Sub(queue.GetName()) - if err := queue.Initialize(viperSub); err != nil { + if config.GetBool(prefix + queue.GetName() + ".enabled") { + if err := queue.Initialize(config, prefix+queue.GetName()+"."); err != nil { glog.Fatalf("Failed to initialize notification for %s: %+v", queue.GetName(), err) } @@ -44,7 +42,7 @@ func LoadConfiguration(config *viper.Viper) { } -func validateOneEnabledQueue(config *viper.Viper) { +func validateOneEnabledQueue(config *util.ViperProxy) { enabledQueue := "" for _, queue := range MessageQueues { if config.GetBool(queue.GetName() + ".enabled") { diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index ebf44ea6f..01c4d901f 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -17,23 +17,34 @@ package gocdk_pub_sub import ( "context" "fmt" + "github.com/golang/protobuf/proto" + "github.com/streadway/amqp" + "gocloud.dev/pubsub" + _ "gocloud.dev/pubsub/awssnssqs" + "gocloud.dev/pubsub/rabbitpubsub" + "net/url" + "path" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" - "gocloud.dev/pubsub" - _ "gocloud.dev/pubsub/awssnssqs" // _ "gocloud.dev/pubsub/azuresb" _ "gocloud.dev/pubsub/gcppubsub" _ "gocloud.dev/pubsub/natspubsub" _ "gocloud.dev/pubsub/rabbitpubsub" + "os" ) func init() { notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{}) } +func getPath(rawUrl string) string { + parsedUrl, _ := url.Parse(rawUrl) + return path.Join(parsedUrl.Host, parsedUrl.Path) +} + type GoCDKPubSub struct { topicURL string topic *pubsub.Topic @@ -43,14 +54,37 @@ func (k *GoCDKPubSub) GetName() string { return "gocdk_pub_sub" } -func (k *GoCDKPubSub) Initialize(config util.Configuration) error { - k.topicURL = config.GetString("topic_url") +func (k *GoCDKPubSub) doReconnect() { + var conn *amqp.Connection + if k.topic.As(&conn) { + go func() { + <-conn.NotifyClose(make(chan *amqp.Error)) + conn.Close() + k.topic.Shutdown(context.Background()) + for { + glog.Info("Try reconnect") + conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) + if err == nil { + k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil) + k.doReconnect() + break + } + glog.Error(err) + time.Sleep(time.Second) + } + }() + } +} + +func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string) error { + k.topicURL = configuration.GetString(prefix + "topic_url") glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) topic, err := pubsub.OpenTopic(context.Background(), k.topicURL) if err != nil { glog.Fatalf("Failed to open topic: %v", err) } k.topic = topic + k.doReconnect() return nil } @@ -59,8 +93,7 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error { if err != nil { return err } - ctx := context.Background() - err = k.topic.Send(ctx, &pubsub.Message{ + err = k.topic.Send(context.Background(), &pubsub.Message{ Body: bytes, Metadata: map[string]string{"key": key}, }) diff --git a/weed/notification/google_pub_sub/google_pub_sub.go b/weed/notification/google_pub_sub/google_pub_sub.go index 7b26bfe38..363a86eb6 100644 --- a/weed/notification/google_pub_sub/google_pub_sub.go +++ b/weed/notification/google_pub_sub/google_pub_sub.go @@ -25,13 +25,13 @@ func (k *GooglePubSub) GetName() string { return "google_pub_sub" } -func (k *GooglePubSub) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString("project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString("topic")) +func (k *GooglePubSub) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetString("google_application_credentials"), - configuration.GetString("project_id"), - configuration.GetString("topic"), + configuration.GetString(prefix+"google_application_credentials"), + configuration.GetString(prefix+"project_id"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go index fd545722b..8d83b5892 100644 --- a/weed/notification/kafka/kafka_queue.go +++ b/weed/notification/kafka/kafka_queue.go @@ -21,12 +21,12 @@ func (k *KafkaQueue) GetName() string { return "kafka" } -func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString("topic")) +func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) { + glog.V(0).Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + glog.V(0).Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), + configuration.GetStringSlice(prefix+"hosts"), + configuration.GetString(prefix+"topic"), ) } diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go index dcc038dfc..1ca4786a1 100644 --- a/weed/notification/log/log_queue.go +++ b/weed/notification/log/log_queue.go @@ -18,7 +18,7 @@ func (k *LogQueue) GetName() string { return "log" } -func (k *LogQueue) Initialize(configuration util.Configuration) (err error) { +func (k *LogQueue) Initialize(configuration util.Configuration, prefix string) (err error) { return nil } |
