aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/replication/repl_util/replication_utli.go10
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go94
2 files changed, 95 insertions, 9 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index 42777f4ad..c5b8ab4e1 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -17,9 +17,10 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
}
var writeErr error
+ var shouldRetry bool
for _, fileUrl := range fileUrls {
- _, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
@@ -30,11 +31,12 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
break
}
}
-
- if err != nil {
+ if shouldRetry && err != nil {
return err
}
-
+ if writeErr != nil {
+ return writeErr
+ }
}
return nil
}
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 09a96d5d5..413c0e3cf 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -2,13 +2,17 @@ package sub
import (
"context"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
+ "github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+ "net/url"
+ "path"
+ "strings"
+
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
@@ -19,6 +23,55 @@ func init() {
NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
}
+func getPath(rawUrl string) string {
+ parsedUrl, _ := url.Parse(rawUrl)
+ return path.Join(parsedUrl.Host, parsedUrl.Path)
+}
+
+func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl string) error {
+ exchangeName := getPath(exchangeUrl)
+ queueName := getPath(queueUrl)
+ exchangeNameDLX := "DLX." + exchangeName
+ queueNameDLX := "DLX." + queueName
+ ch, err := conn.Channel()
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ defer ch.Close()
+ if err := ch.ExchangeDeclare(
+ exchangeNameDLX, "fanout", false, false, false, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.ExchangeDeclare(
+ exchangeName, "fanout", false, false, false, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if _, err := ch.QueueDeclare(
+ queueName, false, false, false, false,
+ amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if _, err := ch.QueueDeclare(
+ queueNameDLX, false, false, false, false,
+ amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ return nil
+}
+
type GoCDKPubSubInput struct {
sub *pubsub.Subscription
}
@@ -28,12 +81,31 @@ func (k *GoCDKPubSubInput) GetName() string {
}
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ topicUrl := configuration.GetString(prefix + "topic_url")
subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
if err != nil {
return err
}
+ var conn *amqp.Connection
+ if sub.As(&conn) {
+ ch, err := conn.Channel()
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ _, err = ch.QueueInspect(getPath(subURL))
+ if err != nil {
+ if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
+ if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
+ return err
+ }
+ } else {
+ return err
+ }
+ }
+ }
k.sub = sub
return nil
}
@@ -43,14 +115,26 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
if err != nil {
return
}
- onSuccessFn = func() {
- msg.Ack()
- }
onFailureFn = func() {
if msg.Nackable() {
- msg.Nack()
+ isRedelivered := false
+ var delivery amqp.Delivery
+ if msg.As(&delivery) {
+ isRedelivered = delivery.Redelivered
+ glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
+ }
+ if isRedelivered {
+ if err := delivery.Nack(false, false); err != nil {
+ glog.Error(err)
+ }
+ } else {
+ msg.Nack()
+ }
}
}
+ onSuccessFn = func() {
+ msg.Ack()
+ }
key = msg.Metadata["key"]
message = &filer_pb.EventNotification{}
err = proto.Unmarshal(msg.Body, message)