diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-10-31 01:11:19 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-10-31 01:11:19 -0700 |
| commit | 4c97ff3717dc642fd2cad311a79df9ba266669cb (patch) | |
| tree | 1c29716a587bb2f853439af6e0ebad88c906a82a /weed/notification | |
| parent | 200cbcde628d814eef44570076885651b3a0ed24 (diff) | |
| download | seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip | |
support AWS SQS as file change notification message queue
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/aws_sqs/aws_sqs_pub.go | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go new file mode 100644 index 000000000..ab8d28006 --- /dev/null +++ b/weed/notification/aws_sqs/aws_sqs_pub.go @@ -0,0 +1,91 @@ +package aws_sqs + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "fmt" + "github.com/aws/aws-sdk-go/aws/awserr" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &AwsSqsPub{}) +} + +type AwsSqsPub struct { + svc *sqs.SQS + queueUrl string +} + +func (k *AwsSqsPub) GetName() string { + return "aws_sqs" +} + +func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) { + glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region")) + glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) + return k.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("sqs_queue_name"), + ) +} + +func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { + + config := &aws.Config{ + Region: aws.String(region), + } + if awsAccessKeyId != "" && aswSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return fmt.Errorf("create aws session: %v", err) + } + k.svc = sqs.New(sess) + + result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + return fmt.Errorf("unable to find queue %s", queueName) + } + return fmt.Errorf("get queue %s url: %v", queueName, err) + } + + k.queueUrl = *result.QueueUrl + + return nil +} + +func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) { + + text := proto.MarshalTextString(message) + + _, err = k.svc.SendMessage(&sqs.SendMessageInput{ + DelaySeconds: aws.Int64(10), + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "key": &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(key), + }, + }, + MessageBody: aws.String(text), + QueueUrl: &k.queueUrl, + }) + + if err != nil { + return fmt.Errorf("send message to sqs %s: %v", k.queueUrl, err) + } + + return nil +} |
