diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-10-31 01:11:19 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-10-31 01:11:19 -0700 |
| commit | 4c97ff3717dc642fd2cad311a79df9ba266669cb (patch) | |
| tree | 1c29716a587bb2f853439af6e0ebad88c906a82a /weed/replication/sub | |
| parent | 200cbcde628d814eef44570076885651b3a0ed24 (diff) | |
| download | seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.tar.xz seaweedfs-4c97ff3717dc642fd2cad311a79df9ba266669cb.zip | |
support AWS SQS as file change notification message queue
Diffstat (limited to 'weed/replication/sub')
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 111 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 158 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go | 18 |
3 files changed, 287 insertions, 0 deletions
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go new file mode 100644 index 000000000..fe1732e88 --- /dev/null +++ b/weed/replication/sub/notification_aws_sqs.go @@ -0,0 +1,111 @@ +package sub + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/aws/awserr" +) + +func init() { + NotificationInputs = append(NotificationInputs, &AwsSqsInput{}) +} + +type AwsSqsInput struct { + svc *sqs.SQS + queueUrl string +} + +func (k *AwsSqsInput) GetName() string { + return "aws_sqs" +} + +func (k *AwsSqsInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString("region")) + glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString("sqs_queue_name")) + return k.initialize( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region"), + configuration.GetString("sqs_queue_name"), + ) +} + +func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) { + + config := &aws.Config{ + Region: aws.String(region), + } + if awsAccessKeyId != "" && aswSecretAccessKey != "" { + config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return fmt.Errorf("create aws session: %v", err) + } + k.svc = sqs.New(sess) + + result, err := k.svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + if aerr, ok := err.(awserr.Error); ok && aerr.Code() == sqs.ErrCodeQueueDoesNotExist { + return fmt.Errorf("unable to find queue %s", queueName) + } + return fmt.Errorf("get queue %s url: %v", queueName, err) + } + + k.queueUrl = *result.QueueUrl + + return nil +} + +func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + // receive message + result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + AttributeNames: []*string{ + aws.String(sqs.MessageSystemAttributeNameSentTimestamp), + }, + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + QueueUrl: &k.queueUrl, + MaxNumberOfMessages: aws.Int64(1), + VisibilityTimeout: aws.Int64(20), // 20 seconds + WaitTimeSeconds: aws.Int64(20), + }) + if err != nil { + err = fmt.Errorf("receive message from sqs %s: %v", k.queueUrl, err) + return + } + if len(result.Messages) == 0 { + return + } + + // process the message + key = *result.Messages[0].Attributes["key"] + text := *result.Messages[0].Body + message = &filer_pb.EventNotification{} + err = proto.UnmarshalText(text, message) + + // delete the message + _, err = k.svc.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: &k.queueUrl, + ReceiptHandle: result.Messages[0].ReceiptHandle, + }) + + if err != nil { + glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err) + } + + return +} diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go new file mode 100644 index 000000000..1a86a8307 --- /dev/null +++ b/weed/replication/sub/notification_kafka.go @@ -0,0 +1,158 @@ +package sub + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "sync" + "time" + + "github.com/Shopify/sarama" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" +) + +func init() { + NotificationInputs = append(NotificationInputs, &KafkaInput{}) +} + +type KafkaInput struct { + topic string + consumer sarama.Consumer + messageChan chan *sarama.ConsumerMessage +} + +func (k *KafkaInput) GetName() string { + return "kafka" +} + +func (k *KafkaInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) + return k.initialize( + configuration.GetStringSlice("hosts"), + configuration.GetString("topic"), + configuration.GetString("offsetFile"), + configuration.GetInt("offsetSaveIntervalSeconds"), + ) +} + +func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, offsetSaveIntervalSeconds int) (err error) { + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + k.consumer, err = sarama.NewConsumer(hosts, config) + if err != nil { + panic(err) + } else { + glog.V(0).Infof("connected to %v", hosts) + } + + k.topic = topic + k.messageChan = make(chan *sarama.ConsumerMessage, 1) + + partitions, err := k.consumer.Partitions(topic) + if err != nil { + panic(err) + } + + progress := loadProgress(offsetFile) + if progress == nil || progress.Topic != topic { + progress = &KafkaProgress{ + Topic: topic, + PartitionOffsets: make(map[int32]int64), + } + } + progress.lastSaveTime = time.Now() + progress.offsetFile = offsetFile + progress.offsetSaveIntervalSeconds = offsetSaveIntervalSeconds + + for _, partition := range partitions { + offset, found := progress.PartitionOffsets[partition] + if !found { + offset = sarama.OffsetOldest + } else { + offset += 1 + } + partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, offset) + if err != nil { + panic(err) + } + go func() { + for { + select { + case err := <-partitionConsumer.Errors(): + fmt.Println(err) + case msg := <-partitionConsumer.Messages(): + k.messageChan <- msg + if err := progress.setOffset(msg.Partition, msg.Offset); err != nil { + glog.Warningf("set kafka offset: %v", err) + } + } + } + }() + } + + return nil +} + +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + msg := <-k.messageChan + + key = string(msg.Key) + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(msg.Value, message) + + return +} + +type KafkaProgress struct { + Topic string `json:"topic"` + PartitionOffsets map[int32]int64 `json:"partitionOffsets"` + offsetFile string + lastSaveTime time.Time + offsetSaveIntervalSeconds int + sync.Mutex +} + +func loadProgress(offsetFile string) *KafkaProgress { + progress := &KafkaProgress{} + data, err := ioutil.ReadFile(offsetFile) + if err != nil { + glog.Warningf("failed to read kafka progress file: %s", offsetFile) + return nil + } + err = json.Unmarshal(data, progress) + if err != nil { + glog.Warningf("failed to read kafka progress message: %s", string(data)) + return nil + } + return progress +} + +func (progress *KafkaProgress) saveProgress() error { + data, err := json.Marshal(progress) + if err != nil { + return fmt.Errorf("failed to marshal progress: %v", err) + } + err = ioutil.WriteFile(progress.offsetFile, data, 0640) + if err != nil { + return fmt.Errorf("failed to save progress to %s: %v", progress.offsetFile, err) + } + + progress.lastSaveTime = time.Now() + return nil +} + +func (progress *KafkaProgress) setOffset(parition int32, offset int64) error { + progress.Lock() + defer progress.Unlock() + + progress.PartitionOffsets[parition] = offset + if int(time.Now().Sub(progress.lastSaveTime).Seconds()) > progress.offsetSaveIntervalSeconds { + return progress.saveProgress() + } + return nil +} diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go new file mode 100644 index 000000000..66fbef824 --- /dev/null +++ b/weed/replication/sub/notifications.go @@ -0,0 +1,18 @@ +package sub + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type NotificationInput interface { + // GetName gets the name to locate the configuration in sync.toml file + GetName() string + // Initialize initializes the file store + Initialize(configuration util.Configuration) error + ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) +} + +var ( + NotificationInputs []NotificationInput +) |
