aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
commit4c97ff3717dc642fd2cad311a79df9ba266669cb (patch)
tree1c29716a587bb2f853439af6e0ebad88c906a82a
parent200cbcde628d814eef44570076885651b3a0ed24 (diff)
downloadseaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz
seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip
support AWS SQS as file change notification message queue
-rw-r--r--weed/command/filer_replication.go9
-rw-r--r--weed/command/scaffold.go15
-rw-r--r--weed/filer2/filer_notify_test.go51
-rw-r--r--weed/notification/aws_sqs/aws_sqs_pub.go91
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go4
-rw-r--r--weed/replication/sub/notification_aws_sqs.go111
-rw-r--r--weed/replication/sub/notification_kafka.go (renamed from weed/replication/notification_kafka.go)2
-rw-r--r--weed/replication/sub/notifications.go (renamed from weed/replication/notifications.go)2
-rw-r--r--weed/server/filer_server.go1
9 files changed, 282 insertions, 4 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 05076143a..3ae4f1e2f 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -13,6 +13,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/replication/sub"
)
func init() {
@@ -37,9 +38,9 @@ func runFilerReplicate(cmd *Command, args []string) bool {
weed_server.LoadConfiguration("replication", true)
config := viper.GetViper()
- var notificationInput replication.NotificationInput
+ var notificationInput sub.NotificationInput
- for _, input := range replication.NotificationInputs {
+ for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
viperSub := config.Sub("notification." + input.GetName())
if err := input.Initialize(viperSub); err != nil {
@@ -99,6 +100,10 @@ func runFilerReplicate(cmd *Command, args []string) bool {
glog.Errorf("receive %s: %+v", key, err)
continue
}
+ if key == "" {
+ // long poll received no messages
+ continue
+ }
if m.OldEntry != nil && m.NewEntry == nil {
glog.V(1).Infof("delete: %s", key)
} else if m.OldEntry == nil && m.NewEntry != nil {
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 95ddbd57c..cc6e5d6ef 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -146,6 +146,14 @@ hosts = [
]
topic = "seaweedfs_filer"
+[notification.aws_sqs]
+# experimental, let me know if it works
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+sqs_queue_name = "my_filer_queue" # an existing queue name
+
`
REPLICATION_TOML_EXAMPLE = `
# A sample TOML config file for replicating SeaweedFS filer
@@ -169,6 +177,13 @@ topic = "seaweedfs_filer1_to_filer2"
offsetFile = "./last.offset"
offsetSaveIntervalSeconds = 10
+[notification.aws_sqs]
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+sqs_queue_name = "my_filer_queue" # an existing queue name
+
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go
new file mode 100644
index 000000000..ab54cd1a2
--- /dev/null
+++ b/weed/filer2/filer_notify_test.go
@@ -0,0 +1,51 @@
+package filer2
+
+import (
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/golang/protobuf/proto"
+)
+
+func TestProtoMarshalText(t *testing.T) {
+
+ oldEntry := &Entry{
+ FullPath: FullPath("/this/path/to"),
+ Attr: Attr{
+ Mtime: time.Now(),
+ Mode: 0644,
+ Uid: 1,
+ Mime: "text/json",
+ TtlSec: 25,
+ },
+ Chunks: []*filer_pb.FileChunk{
+ &filer_pb.FileChunk{
+ FileId: "234,2423423422",
+ Offset: 234234,
+ Size: 234,
+ Mtime: 12312423,
+ ETag: "2342342354",
+ SourceFileId: "23234,2342342342",
+ },
+ },
+ }
+
+ notification := &filer_pb.EventNotification{
+ OldEntry: toProtoEntry(oldEntry),
+ NewEntry: toProtoEntry(nil),
+ DeleteChunks: true,
+ }
+
+ text := proto.MarshalTextString(notification)
+
+ notification2 := &filer_pb.EventNotification{}
+ proto.UnmarshalText(text, notification2)
+
+ if notification2.OldEntry.Chunks[0].SourceFileId != notification.OldEntry.Chunks[0].SourceFileId {
+ t.Fatalf("marshal/unmarshal error: %s", text)
+ }
+
+ println(text)
+
+}
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go
new file mode 100644
index 000000000..ab8d28006
--- /dev/null
+++ b/weed/notification/aws_sqs/aws_sqs_pub.go
@@ -0,0 +1,91 @@
+package aws_sqs
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+)
+
+func init() {
+ notification.MessageQueues = append(notification.MessageQueues, &AwsSqsPub{})
+}
+
+type AwsSqsPub struct {
+ svc *sqs.SQS
+ queueUrl string
+}
+
+func (k *AwsSqsPub) GetName() string {
+ return "aws_sqs"
+}
+
+func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) {
+ glog.V(0).Infof("filer.notification.aws_sqs.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("filer.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+ return k.initialize(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"),
+ configuration.GetString("sqs_queue_name"),
+ )
+}
+
+func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+
+ config := &aws.Config{
+ Region: aws.String(region),
+ }
+ if awsAccessKeyId != "" && aswSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return fmt.Errorf("create aws session: %v", err)
+ }
+ k.svc = sqs.New(sess)
+
+ result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(queueName),
+ })
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
+ return fmt.Errorf("unable to find queue %s", queueName)
+ }
+ return fmt.Errorf("get queue %s url: %v", queueName, err)
+ }
+
+ k.queueUrl = *result.QueueUrl
+
+ return nil
+}
+
+func (k *AwsSqsPub) SendMessage(key string, message proto.Message) (err error) {
+
+ text := proto.MarshalTextString(message)
+
+ _, err = k.svc.SendMessage(&sqs.SendMessageInput{
+ DelaySeconds: aws.Int64(10),
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{
+ "key": &sqs.MessageAttributeValue{
+ DataType: aws.String("String"),
+ StringValue: aws.String(key),
+ },
+ },
+ MessageBody: aws.String(text),
+ QueueUrl: &k.queueUrl,
+ })
+
+ if err != nil {
+ return fmt.Errorf("send message to sqs %s: %v", k.queueUrl, err)
+ }
+
+ return nil
+}
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index b9caa839b..50146a57d 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
type S3Sink struct {
@@ -37,6 +38,9 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("sink.s3.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString("bucket"))
+ glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString("directory"))
return s3sink.initialize(
configuration.GetString("aws_access_key_id"),
configuration.GetString("aws_secret_access_key"),
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
new file mode 100644
index 000000000..fe1732e88
--- /dev/null
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -0,0 +1,111 @@
+package sub
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &AwsSqsInput{})
+}
+
+type AwsSqsInput struct {
+ svc *sqs.SQS
+ queueUrl string
+}
+
+func (k *AwsSqsInput) GetName() string {
+ return "aws_sqs"
+}
+
+func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+ return k.initialize(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"),
+ configuration.GetString("sqs_queue_name"),
+ )
+}
+
+func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+
+ config := &aws.Config{
+ Region: aws.String(region),
+ }
+ if awsAccessKeyId != "" && aswSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return fmt.Errorf("create aws session: %v", err)
+ }
+ k.svc = sqs.New(sess)
+
+ result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(queueName),
+ })
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
+ return fmt.Errorf("unable to find queue %s", queueName)
+ }
+ return fmt.Errorf("get queue %s url: %v", queueName, err)
+ }
+
+ k.queueUrl = *result.QueueUrl
+
+ return nil
+}
+
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+
+ // receive message
+ result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ AttributeNames: []*string{
+ aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
+ },
+ MessageAttributeNames: []*string{
+ aws.String(sqs.QueueAttributeNameAll),
+ },
+ QueueUrl: &k.queueUrl,
+ MaxNumberOfMessages: aws.Int64(1),
+ VisibilityTimeout: aws.Int64(20), // 20 seconds
+ WaitTimeSeconds: aws.Int64(20),
+ })
+ if err != nil {
+ err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err)
+ return
+ }
+ if len(result.Messages) == 0 {
+ return
+ }
+
+ // process the message
+ key = *result.Messages[0].Attributes["key"]
+ text := *result.Messages[0].Body
+ message = &filer_pb.EventNotification{}
+ err = proto.UnmarshalText(text, message)
+
+ // delete the message
+ _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &k.queueUrl,
+ ReceiptHandle: result.Messages[0].ReceiptHandle,
+ })
+
+ if err != nil {
+ glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err)
+ }
+
+ return
+}
diff --git a/weed/replication/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 3bf917376..1a86a8307 100644
--- a/weed/replication/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -1,4 +1,4 @@
-package replication
+package sub
import (
"encoding/json"
diff --git a/weed/replication/notifications.go b/weed/replication/sub/notifications.go
index 6ae95d36b..66fbef824 100644
--- a/weed/replication/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -1,4 +1,4 @@
-package replication
+package sub
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2aabb9932..65fa26987 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"