aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
commit4c97ff3717dc642fd2cad311a79df9ba266669cb (patch)
tree1c29716a587bb2f853439af6e0ebad88c906a82a /weed/replication
parent200cbcde628d814eef44570076885651b3a0ed24 (diff)
downloadseaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz
seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip
support AWS SQS as file change notification message queue
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go4
-rw-r--r--weed/replication/sub/notification_aws_sqs.go111
-rw-r--r--weed/replication/sub/notification_kafka.go (renamed from weed/replication/notification_kafka.go)2
-rw-r--r--weed/replication/sub/notifications.go (renamed from weed/replication/notifications.go)2
4 files changed, 117 insertions, 2 deletions
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index b9caa839b..50146a57d 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
type S3Sink struct {
@@ -37,6 +38,9 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket"))
+ glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory"))
return s3sink.initialize(
configuration.GetString("aws_access_key_id"),
configuration.GetString("aws_secret_access_key"),
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
new file mode 100644
index 000000000..fe1732e88
--- /dev/null
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -0,0 +1,111 @@
+package sub
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &AwsSqsInput{})
+}
+
+type AwsSqsInput struct {
+ svc *sqs.SQS
+ queueUrl string
+}
+
+func (k *AwsSqsInput) GetName() string {
+ return "aws_sqs"
+}
+
+func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+ return k.initialize(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"),
+ configuration.GetString("sqs_queue_name"),
+ )
+}
+
+func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+
+ config := &aws.Config{
+ Region: aws.String(region),
+ }
+ if awsAccessKeyId != "" && aswSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return fmt.Errorf("create aws session: %v", err)
+ }
+ k.svc = sqs.New(sess)
+
+ result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(queueName),
+ })
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
+ return fmt.Errorf("unable to find queue %s", queueName)
+ }
+ return fmt.Errorf("get queue %s url: %v", queueName, err)
+ }
+
+ k.queueUrl = *result.QueueUrl
+
+ return nil
+}
+
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+
+ // receive message
+ result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ AttributeNames: []*string{
+ aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
+ },
+ MessageAttributeNames: []*string{
+ aws.String(sqs.QueueAttributeNameAll),
+ },
+ QueueUrl: &k.queueUrl,
+ MaxNumberOfMessages: aws.Int64(1),
+ VisibilityTimeout: aws.Int64(20), // 20 seconds
+ WaitTimeSeconds: aws.Int64(20),
+ })
+ if err != nil {
+ err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err)
+ return
+ }
+ if len(result.Messages) == 0 {
+ return
+ }
+
+ // process the message
+ key = *result.Messages[0].Attributes["key"]
+ text := *result.Messages[0].Body
+ message = &filer_pb.EventNotification{}
+ err = proto.UnmarshalText(text, message)
+
+ // delete the message
+ _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &k.queueUrl,
+ ReceiptHandle: result.Messages[0].ReceiptHandle,
+ })
+
+ if err != nil {
+ glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err)
+ }
+
+ return
+}
diff --git a/weed/replication/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 3bf917376..1a86a8307 100644
--- a/weed/replication/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -1,4 +1,4 @@
-package replication
+package sub
import (
"encoding/json"
diff --git a/weed/replication/notifications.go b/weed/replication/sub/notifications.go
index 6ae95d36b..66fbef824 100644
--- a/weed/replication/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -1,4 +1,4 @@
-package replication
+package sub
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"