aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sub
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-09 11:37:07 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-09 11:37:07 -0800
commit821c46edf10097200b986bd17dc01d3991cf57ff (patch)
treeca181a9ef3c2f7e45cf0dbb40373b87717a9a636 /weed/replication/sub
parent15da5834e1a33d060924740ba195f6bcd79f2af2 (diff)
parenta6e8d606b47e5f3e8cd8a57d2769d6f1404fbc8f (diff)
downloadseaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.tar.xz
seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.zip
Merge branch 'master' into support_ssd_volume
Diffstat (limited to 'weed/replication/sub')
-rw-r--r--weed/replication/sub/notification_aws_sqs.go2
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go103
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go10
-rw-r--r--weed/replication/sub/notification_kafka.go2
-rw-r--r--weed/replication/sub/notifications.go2
5 files changed, 110 insertions, 9 deletions
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 1dd386ba7..642834c72 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que
return nil
}
-func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
// receive message
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 9726096e5..413c0e3cf 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -2,13 +2,17 @@ package sub
import (
"context"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
+ "github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+ "net/url"
+ "path"
+ "strings"
+
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
@@ -19,6 +23,55 @@ func init() {
NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
}
+func getPath(rawUrl string) string {
+ parsedUrl, _ := url.Parse(rawUrl)
+ return path.Join(parsedUrl.Host, parsedUrl.Path)
+}
+
+func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl string) error {
+ exchangeName := getPath(exchangeUrl)
+ queueName := getPath(queueUrl)
+ exchangeNameDLX := "DLX." + exchangeName
+ queueNameDLX := "DLX." + queueName
+ ch, err := conn.Channel()
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ defer ch.Close()
+ if err := ch.ExchangeDeclare(
+ exchangeNameDLX, "fanout", false, false, false, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.ExchangeDeclare(
+ exchangeName, "fanout", false, false, false, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if _, err := ch.QueueDeclare(
+ queueName, false, false, false, false,
+ amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if _, err := ch.QueueDeclare(
+ queueNameDLX, false, false, false, false,
+ amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ return nil
+}
+
type GoCDKPubSubInput struct {
sub *pubsub.Subscription
}
@@ -28,23 +81,65 @@ func (k *GoCDKPubSubInput) GetName() string {
}
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ topicUrl := configuration.GetString(prefix + "topic_url")
subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
if err != nil {
return err
}
+ var conn *amqp.Connection
+ if sub.As(&conn) {
+ ch, err := conn.Channel()
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ _, err = ch.QueueInspect(getPath(subURL))
+ if err != nil {
+ if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
+ if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
+ return err
+ }
+ } else {
+ return err
+ }
+ }
+ }
k.sub = sub
return nil
}
-func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
+ if err != nil {
+ return
+ }
+ onFailureFn = func() {
+ if msg.Nackable() {
+ isRedelivered := false
+ var delivery amqp.Delivery
+ if msg.As(&delivery) {
+ isRedelivered = delivery.Redelivered
+ glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
+ }
+ if isRedelivered {
+ if err := delivery.Nack(false, false); err != nil {
+ glog.Error(err)
+ }
+ } else {
+ msg.Nack()
+ }
+ }
+ }
+ onSuccessFn = func() {
+ msg.Ack()
+ }
key = msg.Metadata["key"]
message = &filer_pb.EventNotification{}
err = proto.Unmarshal(msg.Body, message)
if err != nil {
- return "", nil, err
+ return "", nil, onSuccessFn, onFailureFn, err
}
- return key, message, nil
+ return key, message, onSuccessFn, onFailureFn, nil
}
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index a950bb42b..f7c767d4a 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
k.messageChan <- m
- m.Ack()
})
return err
}
-func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
m := <-k.messageChan
+ onSuccessFn = func() {
+ m.Ack()
+ }
+ onFailureFn = func() {
+ m.Nack()
+ }
+
// process the message
key = m.Attributes["key"]
message = &filer_pb.EventNotification{}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index fa9cfad9b..622a759ea 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
return nil
}
-func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg := <-k.messageChan
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
index 8a2668f98..d5a910db9 100644
--- a/weed/replication/sub/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -10,7 +10,7 @@ type NotificationInput interface {
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration, prefix string) error
- ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
+ ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error)
}
var (