diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-10-31 01:11:19 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-10-31 01:11:19 -0700 |
| commit | 4c97ff3717dc642fd2cad311a79df9ba266669cb (patch) | |
| tree | 1c29716a587bb2f853439af6e0ebad88c906a82a /weed/replication | |
| parent | 200cbcde628d814eef44570076885651b3a0ed24 (diff) | |
| download | seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip | |
support AWS SQS as file change notification message queue
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 4 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 111 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go (renamed from weed/replication/notification_kafka.go) | 2 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go (renamed from weed/replication/notifications.go) | 2 |
4 files changed, 117 insertions, 2 deletions
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index b9caa839b..50146a57d 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" ) type S3Sink struct { @@ -37,6 +38,9 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { } func (s3sink *S3Sink) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region")) + glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket")) + glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory")) return s3sink.initialize( configuration.GetString("aws_access_key_id"), configuration.GetString("aws_secret_access_key"), diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go new file mode 100644 index 000000000..fe1732e88 --- /dev/null +++ b/weed/replication/sub/notification_aws_sqs.go @@ -0,0 +1,111 @@ +package sub + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/aws/awserr" +) + +func init() { + NotificationInputs = append(NotificationInputs, &AwsSqsInput{}) +} + +type AwsSqsInput struct { + svc *sqs.SQS + queueUrl string +} + +func (k *AwsSqsInput) GetName() string { + return "aws_sqs" +} + +func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) + return k.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("sqs_queue_name"), + ) +} + +func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { + + config := &aws.Config{ + Region: aws.String(region), + } + if awsAccessKeyId != "" && aswSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return fmt.Errorf("create aws session: %v", err) + } + k.svc = sqs.New(sess) + + result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + return fmt.Errorf("unable to find queue %s", queueName) + } + return fmt.Errorf("get queue %s url: %v", queueName, err) + } + + k.queueUrl = *result.QueueUrl + + return nil +} + +func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + // receive message + result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + AttributeNames: []*string{ + aws.String(sqs.MessageSystemAttributeNameSentTimestamp), + }, + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + QueueUrl: &k.queueUrl, + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(20), // 20 seconds + WaitTimeSeconds: aws.Int64(20), + }) + if err != nil { + err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err) + return + } + if len(result.Messages) == 0 { + return + } + + // process the message + key = *result.Messages[0].Attributes["key"] + text := *result.Messages[0].Body + message = &filer_pb.EventNotification{} + err = proto.UnmarshalText(text, message) + + // delete the message + _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &k.queueUrl, + ReceiptHandle: result.Messages[0].ReceiptHandle, + }) + + if err != nil { + glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err) + } + + return +} diff --git a/weed/replication/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 3bf917376..1a86a8307 100644 --- a/weed/replication/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -1,4 +1,4 @@ -package replication +package sub import ( "encoding/json" diff --git a/weed/replication/notifications.go b/weed/replication/sub/notifications.go index 6ae95d36b..66fbef824 100644 --- a/weed/replication/notifications.go +++ b/weed/replication/sub/notifications.go @@ -1,4 +1,4 @@ -package replication +package sub import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" |
