aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorKonstantin Lebedev <lebedev_k@tochka.com>2021-01-26 22:50:25 +0500
committerKonstantin Lebedev <lebedev_k@tochka.com>2021-01-26 22:50:25 +0500
commit612b7975a12da0ff7d84781b1bea338a5f4814c6 (patch)
tree92873f4447e1c5e9007d5c72099444af468b371b /weed
parentc41961d5cccf4a4b49e5968e0da283c6fb1f47c3 (diff)
downloadseaweedfs-612b7975a12da0ff7d84781b1bea338a5f4814c6.tar.xz
seaweedfs-612b7975a12da0ff7d84781b1bea338a5f4814c6.zip
replication to local disk storage
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/scaffold.go4
-rw-r--r--weed/replication/sink/localsink/local_sink.go101
3 files changed, 106 insertions, 0 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 4f698e375..b6515e505 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -11,6 +11,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 8b74274e5..415e0cba3 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -350,6 +350,10 @@ grpcAddress = "localhost:18888"
# i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets"
+[sink.local]
+enabled = false
+directory = "/backup"
+
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
new file mode 100644
index 000000000..76e0384e5
--- /dev/null
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -0,0 +1,101 @@
+package localsink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+type LocalSink struct {
+ dir string
+ filerSource *source.FilerSource
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &LocalSink{})
+}
+
+func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
+ localsink.filerSource = s
+}
+
+func (localsink *LocalSink) GetName() string {
+ return "local"
+}
+
+func (localsink *LocalSink) isMultiPartEntry(key string) bool {
+ return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
+}
+
+func (localsink *LocalSink) initialize(dir string) error {
+ localsink.dir = dir
+ return nil
+}
+
+func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
+ dir := configuration.GetString(prefix + "directory")
+ glog.V(4).Infof("sink.local.directory: %v", dir)
+ return localsink.initialize(dir)
+}
+
+func (localsink *LocalSink) GetSinkToDirectory() string {
+ return localsink.dir
+}
+
+func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
+ if localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Delete Entry key: %s", key)
+ if err := os.Remove(key); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
+ if entry.IsDirectory || localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Create Entry key: %s", key)
+
+ totalSize := filer.FileSize(entry)
+ chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
+
+ dir := filepath.Dir(key)
+
+ if _, err := os.Stat(dir); os.IsNotExist(err) {
+ glog.V(4).Infof("Create Direcotry key: %s", dir)
+ if err = os.MkdirAll(dir, 0); err != nil {
+ return err
+ }
+ }
+
+ writeFunc := func(data []byte) error {
+ writeErr := ioutil.WriteFile(key, data, 0)
+ return writeErr
+ }
+
+ if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
+ if localsink.isMultiPartEntry(key) {
+ return true, nil
+ }
+ glog.V(4).Infof("Update Entry key: %s", key)
+ // do delete and create
+ return false, nil
+}