aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-26 11:08:44 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-26 11:08:44 -0800
commitad2a20c8a5827461d07ab2a46932d0c1c89d1b37 (patch)
tree951b46f31eb1d3fb2107efe062db99a1e99c10d1
parent3a1d3d3413c5fc6a49c138ed5024f2b907f63d37 (diff)
downloadseaweedfs-ad2a20c8a5827461d07ab2a46932d0c1c89d1b37.tar.xz
seaweedfs-ad2a20c8a5827461d07ab2a46932d0c1c89d1b37.zip
notification add ack and nack
-rw-r--r--weed/command/filer_replication.go14
-rw-r--r--weed/replication/sub/notification_aws_sqs.go2
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go17
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go10
-rw-r--r--weed/replication/sub/notification_kafka.go2
-rw-r--r--weed/replication/sub/notifications.go2
6 files changed, 38 insertions, 9 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 4f698e375..0b6eaf94e 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -97,13 +97,19 @@ func runFilerReplicate(cmd *Command, args []string) bool {
replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for {
- key, m, err := notificationInput.ReceiveMessage()
+ key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
if err != nil {
glog.Errorf("receive %s: %+v", key, err)
+ if onFailureFn != nil {
+ onFailureFn()
+ }
continue
}
if key == "" {
// long poll received no messages
+ if onSuccessFn != nil {
+ onSuccessFn()
+ }
continue
}
if m.OldEntry != nil && m.NewEntry == nil {
@@ -115,8 +121,14 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
if err = replicator.Replicate(context.Background(), key, m); err != nil {
glog.Errorf("replicate %s: %+v", key, err)
+ if onFailureFn != nil {
+ onFailureFn()
+ }
} else {
glog.V(1).Infof("replicated %s", key)
+ if onSuccessFn != nil {
+ onSuccessFn()
+ }
}
}
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 1dd386ba7..642834c72 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que
return nil
}
-func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
// receive message
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 9726096e5..09a96d5d5 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -38,13 +38,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return nil
}
-func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
+ if err != nil {
+ return
+ }
+ onSuccessFn = func() {
+ msg.Ack()
+ }
+ onFailureFn = func() {
+ if msg.Nackable() {
+ msg.Nack()
+ }
+ }
key = msg.Metadata["key"]
message = &filer_pb.EventNotification{}
err = proto.Unmarshal(msg.Body, message)
if err != nil {
- return "", nil, err
+ return "", nil, onSuccessFn, onFailureFn, err
}
- return key, message, nil
+ return key, message, onSuccessFn, onFailureFn, nil
}
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index a950bb42b..f7c767d4a 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
k.messageChan <- m
- m.Ack()
})
return err
}
-func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
m := <-k.messageChan
+ onSuccessFn = func() {
+ m.Ack()
+ }
+ onFailureFn = func() {
+ m.Nack()
+ }
+
// process the message
key = m.Attributes["key"]
message = &filer_pb.EventNotification{}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index fa9cfad9b..622a759ea 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
return nil
}
-func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg := <-k.messageChan
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
index 8a2668f98..d5a910db9 100644
--- a/weed/replication/sub/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -10,7 +10,7 @@ type NotificationInput interface {
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration, prefix string) error
- ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
+ ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error)
}
var (