diff options
| author | Konstantin Lebedev <lebedev_k@tochka.com> | 2021-01-26 22:50:25 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <lebedev_k@tochka.com> | 2021-01-26 22:50:25 +0500 |
| commit | 612b7975a12da0ff7d84781b1bea338a5f4814c6 (patch) | |
| tree | 92873f4447e1c5e9007d5c72099444af468b371b /weed | |
| parent | c41961d5cccf4a4b49e5968e0da283c6fb1f47c3 (diff) | |
| download | seaweedfs-612b7975a12da0ff7d84781b1bea338a5f4814c6.tar.xz seaweedfs-612b7975a12da0ff7d84781b1bea338a5f4814c6.zip | |
replication to local disk storage
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/filer_replication.go | 1 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/localsink/local_sink.go | 101 |
3 files changed, 106 insertions, 0 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 4f698e375..b6515e505 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -11,6 +11,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 8b74274e5..415e0cba3 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -350,6 +350,10 @@ grpcAddress = "localhost:18888" # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" +[sink.local] +enabled = false +directory = "/backup" + [sink.filer] enabled = false grpcAddress = "localhost:18888" diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go new file mode 100644 index 000000000..76e0384e5 --- /dev/null +++ b/weed/replication/sink/localsink/local_sink.go @@ -0,0 +1,101 @@ +package localsink + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "io/ioutil" + "os" + "path/filepath" + "strings" +) + +type LocalSink struct { + dir string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &LocalSink{}) +} + +func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) { + localsink.filerSource = s +} + +func (localsink *LocalSink) GetName() string { + return "local" +} + +func (localsink *LocalSink) isMultiPartEntry(key string) bool { + return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") +} + +func (localsink *LocalSink) initialize(dir string) error { + localsink.dir = dir + return nil +} + +func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { + dir := configuration.GetString(prefix + "directory") + glog.V(4).Infof("sink.local.directory: %v", dir) + return localsink.initialize(dir) +} + +func (localsink *LocalSink) GetSinkToDirectory() string { + return localsink.dir +} + +func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { + if localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Delete Entry key: %s", key) + if err := os.Remove(key); err != nil { + return err + } + return nil +} + +func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { + if entry.IsDirectory || localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Create Entry key: %s", key) + + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + + dir := filepath.Dir(key) + + if _, err := os.Stat(dir); os.IsNotExist(err) { + glog.V(4).Infof("Create Direcotry key: %s", dir) + if err = os.MkdirAll(dir, 0); err != nil { + return err + } + } + + writeFunc := func(data []byte) error { + writeErr := ioutil.WriteFile(key, data, 0) + return writeErr + } + + if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil { + return err + } + + return nil +} + +func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { + if localsink.isMultiPartEntry(key) { + return true, nil + } + glog.V(4).Infof("Update Entry key: %s", key) + // do delete and create + return false, nil +} |
