aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-09 11:37:07 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-09 11:37:07 -0800
commit821c46edf10097200b986bd17dc01d3991cf57ff (patch)
treeca181a9ef3c2f7e45cf0dbb40373b87717a9a636 /weed/replication
parent15da5834e1a33d060924740ba195f6bcd79f2af2 (diff)
parenta6e8d606b47e5f3e8cd8a57d2769d6f1404fbc8f (diff)
downloadseaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.tar.xz
seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.zip
Merge branch 'master' into support_ssd_volume
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/repl_util/replication_utli.go10
-rw-r--r--weed/replication/replicator.go13
-rw-r--r--weed/replication/sink/filersink/fetch_write.go4
-rw-r--r--weed/replication/sink/filersink/filer_sink.go35
-rw-r--r--weed/replication/sink/localsink/local_incremental_sink.go17
-rw-r--r--weed/replication/sink/localsink/local_sink.go101
-rw-r--r--weed/replication/source/filer_source.go19
-rw-r--r--weed/replication/sub/notification_aws_sqs.go2
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go103
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go10
-rw-r--r--weed/replication/sub/notification_kafka.go2
-rw-r--r--weed/replication/sub/notifications.go2
12 files changed, 289 insertions, 29 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index 42777f4ad..c5b8ab4e1 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -17,9 +17,10 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
}
var writeErr error
+ var shouldRetry bool
for _, fileUrl := range fileUrls {
- _, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
@@ -30,11 +31,12 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
break
}
}
-
- if err != nil {
+ if shouldRetry && err != nil {
return err
}
-
+ if writeErr != nil {
+ return writeErr
+ }
}
return nil
}
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index c4228434f..7688029e6 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc"
"strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil
}
- newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
+ var dateKey string
+ if r.sink.GetName() == "local_incremental" {
+ var mTime int64
+ if message.NewEntry != nil {
+ mTime = message.NewEntry.Attributes.Mtime
+ } else if message.OldEntry != nil {
+ mTime = message.OldEntry.Attributes.Mtime
+ }
+ dateKey = time.Unix(mTime, 0).Format("2006-01-02")
+ }
+ newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):])
glog.V(3).Infof("replicate %s => %s", key, newKey)
key = newKey
if message.OldEntry != nil && message.NewEntry == nil {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 6b51a7966..a7392d856 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -30,6 +30,7 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
replicatedChunk, e := fs.replicateOneChunk(chunk, path)
if e != nil {
err = e
+ return
}
replicatedChunks[index] = replicatedChunk
}(sourceChunk, chunkIndex)
@@ -98,6 +99,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if fs.writeChunkByFiler {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
+ }
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index a69c9dc8e..9960634f6 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -3,6 +3,8 @@ package filersink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@@ -17,15 +19,17 @@ import (
)
type FilerSink struct {
- filerSource *source.FilerSource
- grpcAddress string
- dir string
- replication string
- collection string
- ttlSec int32
+ filerSource *source.FilerSource
+ grpcAddress string
+ dir string
+ replication string
+ collection string
+ ttlSec int32
diskType string
- dataCenter string
- grpcDialOption grpc.DialOption
+ dataCenter string
+ grpcDialOption grpc.DialOption
+ address string
+ writeChunkByFiler bool
}
func init() {
@@ -42,21 +46,27 @@ func (fs *FilerSink) GetSinkToDirectory() string {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
return fs.DoInitialize(
+ "",
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
configuration.GetString(prefix+"replication"),
configuration.GetString(prefix+"collection"),
configuration.GetInt(prefix+"ttlSec"),
configuration.GetString(prefix+"disk"),
- security.LoadClientTLS(util.GetViper(), "grpc.client"))
+ security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ false)
}
func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) {
fs.filerSource = s
}
-func (fs *FilerSink) DoInitialize(grpcAddress string, dir string,
- replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption) (err error) {
+func (fs *FilerSink) DoInitialize(address, grpcAddress string, dir string,
+ replication string, collection string, ttlSec int, diskType string, grpcDialOption grpc.DialOption, writeChunkByFiler bool) (err error) {
+ fs.address = address
+ if fs.address == "" {
+ fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
+ }
fs.grpcAddress = grpcAddress
fs.dir = dir
fs.replication = replication
@@ -64,6 +74,7 @@ func (fs *FilerSink) DoInitialize(grpcAddress string, dir string,
fs.ttlSec = int32(ttlSec)
fs.diskType = diskType
fs.grpcDialOption = grpcDialOption
+ fs.writeChunkByFiler = writeChunkByFiler
return nil
}
@@ -209,7 +220,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
})
}
-func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
+func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
if aErr != nil {
return nil, nil, aErr
diff --git a/weed/replication/sink/localsink/local_incremental_sink.go b/weed/replication/sink/localsink/local_incremental_sink.go
new file mode 100644
index 000000000..a1d49e28a
--- /dev/null
+++ b/weed/replication/sink/localsink/local_incremental_sink.go
@@ -0,0 +1,17 @@
+package localsink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+)
+
+type LocalIncSink struct {
+ LocalSink
+}
+
+func (localincsink *LocalIncSink) GetName() string {
+ return "local_incremental"
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &LocalIncSink{})
+}
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
new file mode 100644
index 000000000..21c625c3f
--- /dev/null
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -0,0 +1,101 @@
+package localsink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+type LocalSink struct {
+ Dir string
+ filerSource *source.FilerSource
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &LocalSink{})
+}
+
+func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
+ localsink.filerSource = s
+}
+
+func (localsink *LocalSink) GetName() string {
+ return "local"
+}
+
+func (localsink *LocalSink) isMultiPartEntry(key string) bool {
+ return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
+}
+
+func (localsink *LocalSink) initialize(dir string) error {
+ localsink.Dir = dir
+ return nil
+}
+
+func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
+ dir := configuration.GetString(prefix + "directory")
+ glog.V(4).Infof("sink.local.directory: %v", dir)
+ return localsink.initialize(dir)
+}
+
+func (localsink *LocalSink) GetSinkToDirectory() string {
+ return localsink.Dir
+}
+
+func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
+ if localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Delete Entry key: %s", key)
+ if err := os.Remove(key); err != nil {
+ glog.V(0).Infof("remove entry key %s: %s", key, err)
+ }
+ return nil
+}
+
+func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
+ if entry.IsDirectory || localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Create Entry key: %s", key)
+
+ totalSize := filer.FileSize(entry)
+ chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
+
+ dir := filepath.Dir(key)
+
+ if _, err := os.Stat(dir); os.IsNotExist(err) {
+ glog.V(4).Infof("Create Direcotry key: %s", dir)
+ if err = os.MkdirAll(dir, 0); err != nil {
+ return err
+ }
+ }
+
+ writeFunc := func(data []byte) error {
+ writeErr := ioutil.WriteFile(key, data, 0)
+ return writeErr
+ }
+
+ if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
+ if localsink.isMultiPartEntry(key) {
+ return true, nil
+ }
+ glog.V(4).Infof("Update Entry key: %s", key)
+ // do delete and create
+ return false, nil
+}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index ff4f2eb26..3982360b0 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -25,19 +25,28 @@ type FilerSource struct {
grpcAddress string
grpcDialOption grpc.DialOption
Dir string
+ address string
+ proxyByFiler bool
}
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
return fs.DoInitialize(
+ "",
configuration.GetString(prefix+"grpcAddress"),
configuration.GetString(prefix+"directory"),
+ false,
)
}
-func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) {
+func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, readChunkFromFiler bool) (err error) {
+ fs.address = address
+ if fs.address == "" {
+ fs.address = pb.GrpcAddressToServerAddress(grpcAddress)
+ }
fs.grpcAddress = grpcAddress
fs.Dir = dir
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ fs.proxyByFiler = readChunkFromFiler
return nil
}
@@ -81,9 +90,13 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
return
}
-func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
+func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
+
+ if fs.proxyByFiler {
+ return util.DownloadFile("http://" + fs.address + "/?proxyChunkId=" + fileId)
+ }
- fileUrls, err := fs.LookupFileId(part)
+ fileUrls, err := fs.LookupFileId(fileId)
if err != nil {
return "", nil, nil, err
}
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 1dd386ba7..642834c72 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que
return nil
}
-func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
// receive message
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 9726096e5..413c0e3cf 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -2,13 +2,17 @@ package sub
import (
"context"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
+ "github.com/streadway/amqp"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
+ "net/url"
+ "path"
+ "strings"
+
// _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
@@ -19,6 +23,55 @@ func init() {
NotificationInputs = append(NotificationInputs, &GoCDKPubSubInput{})
}
+func getPath(rawUrl string) string {
+ parsedUrl, _ := url.Parse(rawUrl)
+ return path.Join(parsedUrl.Host, parsedUrl.Path)
+}
+
+func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl string) error {
+ exchangeName := getPath(exchangeUrl)
+ queueName := getPath(queueUrl)
+ exchangeNameDLX := "DLX." + exchangeName
+ queueNameDLX := "DLX." + queueName
+ ch, err := conn.Channel()
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ defer ch.Close()
+ if err := ch.ExchangeDeclare(
+ exchangeNameDLX, "fanout", false, false, false, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.ExchangeDeclare(
+ exchangeName, "fanout", false, false, false, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if _, err := ch.QueueDeclare(
+ queueName, false, false, false, false,
+ amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if _, err := ch.QueueDeclare(
+ queueNameDLX, false, false, false, false,
+ amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
+ glog.Error(err)
+ return err
+ }
+ if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
+ glog.Error(err)
+ return err
+ }
+ return nil
+}
+
type GoCDKPubSubInput struct {
sub *pubsub.Subscription
}
@@ -28,23 +81,65 @@ func (k *GoCDKPubSubInput) GetName() string {
}
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
+ topicUrl := configuration.GetString(prefix + "topic_url")
subURL := configuration.GetString(prefix + "sub_url")
glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
if err != nil {
return err
}
+ var conn *amqp.Connection
+ if sub.As(&conn) {
+ ch, err := conn.Channel()
+ if err != nil {
+ return err
+ }
+ defer ch.Close()
+ _, err = ch.QueueInspect(getPath(subURL))
+ if err != nil {
+ if strings.HasPrefix(err.Error(), "Exception (404) Reason") {
+ if err := QueueDeclareAndBind(conn, topicUrl, subURL); err != nil {
+ return err
+ }
+ } else {
+ return err
+ }
+ }
+ }
k.sub = sub
return nil
}
-func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
+ if err != nil {
+ return
+ }
+ onFailureFn = func() {
+ if msg.Nackable() {
+ isRedelivered := false
+ var delivery amqp.Delivery
+ if msg.As(&delivery) {
+ isRedelivered = delivery.Redelivered
+ glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
+ }
+ if isRedelivered {
+ if err := delivery.Nack(false, false); err != nil {
+ glog.Error(err)
+ }
+ } else {
+ msg.Nack()
+ }
+ }
+ }
+ onSuccessFn = func() {
+ msg.Ack()
+ }
key = msg.Metadata["key"]
message = &filer_pb.EventNotification{}
err = proto.Unmarshal(msg.Body, message)
if err != nil {
- return "", nil, err
+ return "", nil, onSuccessFn, onFailureFn, err
}
- return key, message, nil
+ return key, message, onSuccessFn, onFailureFn, nil
}
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index a950bb42b..f7c767d4a 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
k.messageChan <- m
- m.Ack()
})
return err
}
-func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
m := <-k.messageChan
+ onSuccessFn = func() {
+ m.Ack()
+ }
+ onFailureFn = func() {
+ m.Nack()
+ }
+
// process the message
key = m.Attributes["key"]
message = &filer_pb.EventNotification{}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index fa9cfad9b..622a759ea 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
return nil
}
-func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg := <-k.messageChan
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
index 8a2668f98..d5a910db9 100644
--- a/weed/replication/sub/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -10,7 +10,7 @@ type NotificationInput interface {
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration, prefix string) error
- ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
+ ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error)
}
var (