aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sub
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-31 01:11:19 -0700
commit4c97ff3717dc642fd2cad311a79df9ba266669cb (patch)
tree1c29716a587bb2f853439af6e0ebad88c906a82a /weed/replication/sub
parent200cbcde628d814eef44570076885651b3a0ed24 (diff)
downloadseaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz
seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip
support AWS SQS as file change notification message queue
Diffstat (limited to 'weed/replication/sub')
-rw-r--r--weed/replication/sub/notification_aws_sqs.go111
-rw-r--r--weed/replication/sub/notification_kafka.go158
-rw-r--r--weed/replication/sub/notifications.go18
3 files changed, 287 insertions, 0 deletions
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
new file mode 100644
index 000000000..fe1732e88
--- /dev/null
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -0,0 +1,111 @@
+package sub
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &AwsSqsInput{})
+}
+
+type AwsSqsInput struct {
+ svc *sqs.SQS
+ queueUrl string
+}
+
+func (k *AwsSqsInput) GetName() string {
+ return "aws_sqs"
+}
+
+func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name"))
+ return k.initialize(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"),
+ configuration.GetString("sqs_queue_name"),
+ )
+}
+
+func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+
+ config := &aws.Config{
+ Region: aws.String(region),
+ }
+ if awsAccessKeyId != "" && aswSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return fmt.Errorf("create aws session: %v", err)
+ }
+ k.svc = sqs.New(sess)
+
+ result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{
+ QueueName: aws.String(queueName),
+ })
+ if err != nil {
+ if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist {
+ return fmt.Errorf("unable to find queue %s", queueName)
+ }
+ return fmt.Errorf("get queue %s url: %v", queueName, err)
+ }
+
+ k.queueUrl = *result.QueueUrl
+
+ return nil
+}
+
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+
+ // receive message
+ result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
+ AttributeNames: []*string{
+ aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
+ },
+ MessageAttributeNames: []*string{
+ aws.String(sqs.QueueAttributeNameAll),
+ },
+ QueueUrl: &k.queueUrl,
+ MaxNumberOfMessages: aws.Int64(1),
+ VisibilityTimeout: aws.Int64(20), // 20 seconds
+ WaitTimeSeconds: aws.Int64(20),
+ })
+ if err != nil {
+ err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err)
+ return
+ }
+ if len(result.Messages) == 0 {
+ return
+ }
+
+ // process the message
+ key = *result.Messages[0].Attributes["key"]
+ text := *result.Messages[0].Body
+ message = &filer_pb.EventNotification{}
+ err = proto.UnmarshalText(text, message)
+
+ // delete the message
+ _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{
+ QueueUrl: &k.queueUrl,
+ ReceiptHandle: result.Messages[0].ReceiptHandle,
+ })
+
+ if err != nil {
+ glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err)
+ }
+
+ return
+}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
new file mode 100644
index 000000000..1a86a8307
--- /dev/null
+++ b/weed/replication/sub/notification_kafka.go
@@ -0,0 +1,158 @@
+package sub
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "sync"
+ "time"
+
+ "github.com/Shopify/sarama"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+)
+
+func init() {
+ NotificationInputs = append(NotificationInputs, &KafkaInput{})
+}
+
+type KafkaInput struct {
+ topic string
+ consumer sarama.Consumer
+ messageChan chan *sarama.ConsumerMessage
+}
+
+func (k *KafkaInput) GetName() string {
+ return "kafka"
+}
+
+func (k *KafkaInput) Initialize(configuration util.Configuration) error {
+ glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
+ glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic"))
+ return k.initialize(
+ configuration.GetStringSlice("hosts"),
+ configuration.GetString("topic"),
+ configuration.GetString("offsetFile"),
+ configuration.GetInt("offsetSaveIntervalSeconds"),
+ )
+}
+
+func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) {
+ config := sarama.NewConfig()
+ config.Consumer.Return.Errors = true
+ k.consumer, err = sarama.NewConsumer(hosts, config)
+ if err != nil {
+ panic(err)
+ } else {
+ glog.V(0).Infof("connected to %v", hosts)
+ }
+
+ k.topic = topic
+ k.messageChan = make(chan *sarama.ConsumerMessage, 1)
+
+ partitions, err := k.consumer.Partitions(topic)
+ if err != nil {
+ panic(err)
+ }
+
+ progress := loadProgress(offsetFile)
+ if progress == nil || progress.Topic != topic {
+ progress = &KafkaProgress{
+ Topic: topic,
+ PartitionOffsets: make(map[int32]int64),
+ }
+ }
+ progress.lastSaveTime = time.Now()
+ progress.offsetFile = offsetFile
+ progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds
+
+ for _, partition := range partitions {
+ offset, found := progress.PartitionOffsets[partition]
+ if !found {
+ offset = sarama.OffsetOldest
+ } else {
+ offset += 1
+ }
+ partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset)
+ if err != nil {
+ panic(err)
+ }
+ go func() {
+ for {
+ select {
+ case err := <-partitionConsumer.Errors():
+ fmt.Println(err)
+ case msg := <-partitionConsumer.Messages():
+ k.messageChan <- msg
+ if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
+ glog.Warningf("set kafka offset: %v", err)
+ }
+ }
+ }
+ }()
+ }
+
+ return nil
+}
+
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+
+ msg := <-k.messageChan
+
+ key = string(msg.Key)
+ message = &filer_pb.EventNotification{}
+ err = proto.Unmarshal(msg.Value, message)
+
+ return
+}
+
+type KafkaProgress struct {
+ Topic string `json:"topic"`
+ PartitionOffsets map[int32]int64 `json:"partitionOffsets"`
+ offsetFile string
+ lastSaveTime time.Time
+ offsetSaveIntervalSeconds int
+ sync.Mutex
+}
+
+func loadProgress(offsetFile string) *KafkaProgress {
+ progress := &KafkaProgress{}
+ data, err := ioutil.ReadFile(offsetFile)
+ if err != nil {
+ glog.Warningf("failed to read kafka progress file: %s", offsetFile)
+ return nil
+ }
+ err = json.Unmarshal(data, progress)
+ if err != nil {
+ glog.Warningf("failed to read kafka progress message: %s", string(data))
+ return nil
+ }
+ return progress
+}
+
+func (progress *KafkaProgress) saveProgress() error {
+ data, err := json.Marshal(progress)
+ if err != nil {
+ return fmt.Errorf("failed to marshal progress: %v", err)
+ }
+ err = ioutil.WriteFile(progress.offsetFile, data, 0640)
+ if err != nil {
+ return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err)
+ }
+
+ progress.lastSaveTime = time.Now()
+ return nil
+}
+
+func (progress *KafkaProgress) setOffset(parition int32, offset int64) error {
+ progress.Lock()
+ defer progress.Unlock()
+
+ progress.PartitionOffsets[parition] = offset
+ if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds {
+ return progress.saveProgress()
+ }
+ return nil
+}
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
new file mode 100644
index 000000000..66fbef824
--- /dev/null
+++ b/weed/replication/sub/notifications.go
@@ -0,0 +1,18 @@
+package sub
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type NotificationInput interface {
+ // GetName gets the name to locate the configuration in sync.toml file
+ GetName() string
+ // Initialize initializes the file store
+ Initialize(configuration util.Configuration) error
+ ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
+}
+
+var (
+ NotificationInputs []NotificationInput
+)