aboutsummaryrefslogtreecommitdiff
path: root/weed/notification
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
commit4c97ff3717dc642fd2cad311a79df9ba266669cb (patch)
tree1c29716a587bb2f853439af6e0ebad88c906a82a /weed/notification
parent200cbcde628d814eef44570076885651b3a0ed24 (diff)
downloadseaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz
seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip
support AWS SQS as file change notification message queue
Diffstat (limited to 'weed/notification')
-rw-r--r--weed/notification/aws_sqs/aws_sqs_pub.go91
1 files changed, 91 insertions, 0 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go
new file mode 100644
index 000000000..ab8d28006
--- /dev/null
+++ b/weed/notification/aws_sqs/aws_sqs_pub.go
@@ -0,0 +1,91 @@
+package aws_sqs
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+)
+
+func init() {
+ notification.MessageQueues = append(notification.MessageQueues, &AwsSqsPub{})
+}
+
+type AwsSqsPub struct {
+ svc *sqs.SQS
+ queueUrl string
+}
+
+func (k *AwsSqsPub) GetName() string {
+ return "aws_sqs"
+}
+
+func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) {
+ glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+ return k.initialize(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"),
+ configuration.GetString("sqs_queue_name"),
+ )
+}
+
+func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+
+ config := &aws.Config{
+ Region: aws.String(region),
+ }
+ if awsAccessKeyId != "" && aswSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return fmt.Errorf("create aws session: %v", err)
+ }
+ k.svc = sqs.New(sess)
+
+ result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(queueName),
+ })
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
+ return fmt.Errorf("unable to find queue %s", queueName)
+ }
+ return fmt.Errorf("get queue %s url: %v", queueName, err)
+ }
+
+ k.queueUrl = *result.QueueUrl
+
+ return nil
+}
+
+func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) {
+
+ text := proto.MarshalTextString(message)
+
+ _, err = k.svc.SendMessage(&sqs.SendMessageInput{
+ DelaySeconds: aws.Int64(10),
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{
+ "key": &sqs.MessageAttributeValue{
+ DataType: aws.String("String"),
+ StringValue: aws.String(key),
+ },
+ },
+ MessageBody: aws.String(text),
+ QueueUrl: &k.queueUrl,
+ })
+
+ if err != nil {
+ return fmt.Errorf("send message to sqs %s: %v", k.queueUrl, err)
+ }
+
+ return nil
+}