aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-01-28 02:03:33 -0800
committerGitHub <noreply@github.com>2021-01-28 02:03:33 -0800
commit822f1ade9d3f613e4f400ed0945f52b3bbba3780 (patch)
treeecd6b802f0458909814d25340829b56e65d5a325
parent5f72c388bcbd38477262ffc754b48485fdd087d0 (diff)
parentbe1062b7fcabf793ea3cbfd5a67b81c07610ce83 (diff)
downloadseaweedfs-822f1ade9d3f613e4f400ed0945f52b3bbba3780.tar.xz
seaweedfs-822f1ade9d3f613e4f400ed0945f52b3bbba3780.zip
Merge pull request #1762 from kmlebedev/backupsink
replication to create time date directory
-rw-r--r--docker/replication.toml5
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/scaffold.go7
-rw-r--r--weed/replication/replicator.go13
-rw-r--r--weed/replication/sink/localincrementalsink/local_incremental_sink.go18
-rw-r--r--weed/replication/sink/localsink/local_sink.go19
6 files changed, 45 insertions, 18 deletions
diff --git a/docker/replication.toml b/docker/replication.toml
index 2cee755a5..833bb1692 100644
--- a/docker/replication.toml
+++ b/docker/replication.toml
@@ -6,7 +6,6 @@ grpcAddress = "filer:18888"
# i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets"
-[sink.local]
+[sink.local_incremental]
enabled = true
-directory = "/data"
-todays_date_format = "2006-02-01" \ No newline at end of file
+directory = "/data" \ No newline at end of file
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index e8c06b208..47ef5d1b6 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -11,6 +11,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localincrementalsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 1705c6ae4..a95c41054 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -353,9 +353,14 @@ directory = "/buckets"
[sink.local]
enabled = false
-directory = "/backup"
+directory = "/data"
todays_date_format = "" # set this to 2006-02-01 for incremental backup
+[sink.local_incremental]
+enabled = false
+# all replicated files are under modification time date directory tree
+directory = "/backup"
+
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index c4228434f..7688029e6 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc"
"strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -40,7 +41,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil
}
- newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
+ var dateKey string
+ if r.sink.GetName() == "local_incremental" {
+ var mTime int64
+ if message.NewEntry != nil {
+ mTime = message.NewEntry.Attributes.Mtime
+ } else if message.OldEntry != nil {
+ mTime = message.OldEntry.Attributes.Mtime
+ }
+ dateKey = time.Unix(mTime, 0).Format("2006-01-02")
+ }
+ newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):])
glog.V(3).Infof("replicate %s => %s", key, newKey)
key = newKey
if message.OldEntry != nil && message.NewEntry == nil {
diff --git a/weed/replication/sink/localincrementalsink/local_incremental_sink.go b/weed/replication/sink/localincrementalsink/local_incremental_sink.go
new file mode 100644
index 000000000..97da3a5f7
--- /dev/null
+++ b/weed/replication/sink/localincrementalsink/local_incremental_sink.go
@@ -0,0 +1,18 @@
+package localincrementalsink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
+)
+
+type LocalIncSink struct {
+ localsink.LocalSink
+}
+
+func (localincsink *LocalIncSink) GetName() string {
+ return "local_incremental"
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &LocalIncSink{})
+}
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index 5ca562ec8..21c625c3f 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -12,13 +12,11 @@ import (
"os"
"path/filepath"
"strings"
- "time"
)
type LocalSink struct {
- dir string
- todaysDateFormat string
- filerSource *source.FilerSource
+ Dir string
+ filerSource *source.FilerSource
}
func init() {
@@ -37,24 +35,19 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool {
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
}
-func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
- localsink.dir = dir
- localsink.todaysDateFormat = todaysDateFormat
+func (localsink *LocalSink) initialize(dir string) error {
+ localsink.Dir = dir
return nil
}
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory")
- todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
glog.V(4).Infof("sink.local.directory: %v", dir)
- return localsink.initialize(dir, todaysDateFormat)
+ return localsink.initialize(dir)
}
func (localsink *LocalSink) GetSinkToDirectory() string {
- if localsink.todaysDateFormat != "" {
- return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
- }
- return localsink.dir
+ return localsink.Dir
}
func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {