aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-01-26 22:31:54 -0800
committerGitHub <noreply@github.com>2021-01-26 22:31:54 -0800
commit5e07afb0f0c1eca20da0780c806973db40ea6a24 (patch)
tree2d2167b2fb8994a1319d8c72c2f9f10087bf9daa
parentb3f66199db4b6a9da48f091a33e87470877d6d73 (diff)
parent612b7975a12da0ff7d84781b1bea338a5f4814c6 (diff)
downloadseaweedfs-5e07afb0f0c1eca20da0780c806973db40ea6a24.tar.xz
seaweedfs-5e07afb0f0c1eca20da0780c806973db40ea6a24.zip
Merge pull request #1759 from kmlebedev/sink.local
replication to local disk storage
-rw-r--r--docker/Makefile3
-rw-r--r--docker/local-replicate-compose.yml59
-rw-r--r--docker/notification.toml16
-rw-r--r--docker/replication.toml11
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/scaffold.go4
-rw-r--r--weed/replication/sink/localsink/local_sink.go101
7 files changed, 195 insertions, 0 deletions
diff --git a/docker/Makefile b/docker/Makefile
index fe278f9b4..5949842f1 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -21,6 +21,9 @@ k8s: build
dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up
+dev_replicate: build
+ docker-compose -f local-replicate-compose.yml -p seaweedfs up
+
cluster: build
docker-compose -f local-cluster-compose.yml -p seaweedfs up
diff --git a/docker/local-replicate-compose.yml b/docker/local-replicate-compose.yml
new file mode 100644
index 000000000..a8e0f808e
--- /dev/null
+++ b/docker/local-replicate-compose.yml
@@ -0,0 +1,59 @@
+version: '2'
+
+services:
+ master:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9333:9333
+ - 19333:19333
+ command: "master -ip=master"
+ volume:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8080:8080
+ - 18080:18080
+ command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
+ depends_on:
+ - master
+ filer:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8888:8888
+ - 18888:18888
+ command: '-v=9 filer -master="master:9333"'
+ restart: on-failure
+ volumes:
+ - ./notification.toml:/etc/seaweedfs/notification.toml
+ depends_on:
+ - master
+ - volume
+ - rabbitmq
+ - replicate
+ environment:
+ RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/"
+ replicate:
+ image: chrislusf/seaweedfs:local
+ command: '-v=9 filer.replicate'
+ restart: on-failure
+ volumes:
+ - ./notification.toml:/etc/seaweedfs/notification.toml
+ - ./replication.toml:/etc/seaweedfs/replication.toml
+ depends_on:
+ - rabbitmq
+ environment:
+ RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/"
+ s3:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8333:8333
+ command: 's3 -filer="filer:8888"'
+ depends_on:
+ - master
+ - volume
+ - filer
+ rabbitmq:
+ image: rabbitmq:3.8.10-management-alpine
+ ports:
+ - 5672:5672
+ - 15671:15671
+ - 15672:15672 \ No newline at end of file
diff --git a/docker/notification.toml b/docker/notification.toml
new file mode 100644
index 000000000..4ed76825e
--- /dev/null
+++ b/docker/notification.toml
@@ -0,0 +1,16 @@
+[notification.log]
+# this is only for debugging perpose and does not work with "weed filer.replicate"
+enabled = false
+
+
+[notification.gocdk_pub_sub]
+# The Go Cloud Development Kit (https://gocloud.dev).
+# PubSub API (https://godoc.org/gocloud.dev/pubsub).
+# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
+enabled = true
+# This URL will Dial the RabbitMQ server at the URL in the environment
+# variable RABBIT_SERVER_URL and open the exchange "myexchange".
+# The exchange must have already been created by some other means, like
+# the RabbitMQ management plugin.
+topic_url = "rabbit://swexchange"
+sub_url = "rabbit://swqueue" \ No newline at end of file
diff --git a/docker/replication.toml b/docker/replication.toml
new file mode 100644
index 000000000..84c0bec28
--- /dev/null
+++ b/docker/replication.toml
@@ -0,0 +1,11 @@
+[source.filer]
+enabled = true
+grpcAddress = "filer:18888"
+# all files under this directory tree are replicated.
+# this is not a directory on your hard drive, but on your filer.
+# i.e., all files with this "prefix" are sent to notification message queue.
+directory = "/buckets"
+
+[sink.local]
+enabled = true
+directory = "/data" \ No newline at end of file
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 0b6eaf94e..e8c06b208 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -11,6 +11,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 8b74274e5..415e0cba3 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -350,6 +350,10 @@ grpcAddress = "localhost:18888"
# i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets"
+[sink.local]
+enabled = false
+directory = "/backup"
+
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
new file mode 100644
index 000000000..76e0384e5
--- /dev/null
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -0,0 +1,101 @@
+package localsink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+type LocalSink struct {
+ dir string
+ filerSource *source.FilerSource
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &LocalSink{})
+}
+
+func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
+ localsink.filerSource = s
+}
+
+func (localsink *LocalSink) GetName() string {
+ return "local"
+}
+
+func (localsink *LocalSink) isMultiPartEntry(key string) bool {
+ return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
+}
+
+func (localsink *LocalSink) initialize(dir string) error {
+ localsink.dir = dir
+ return nil
+}
+
+func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
+ dir := configuration.GetString(prefix + "directory")
+ glog.V(4).Infof("sink.local.directory: %v", dir)
+ return localsink.initialize(dir)
+}
+
+func (localsink *LocalSink) GetSinkToDirectory() string {
+ return localsink.dir
+}
+
+func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
+ if localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Delete Entry key: %s", key)
+ if err := os.Remove(key); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
+ if entry.IsDirectory || localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Create Entry key: %s", key)
+
+ totalSize := filer.FileSize(entry)
+ chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
+
+ dir := filepath.Dir(key)
+
+ if _, err := os.Stat(dir); os.IsNotExist(err) {
+ glog.V(4).Infof("Create Direcotry key: %s", dir)
+ if err = os.MkdirAll(dir, 0); err != nil {
+ return err
+ }
+ }
+
+ writeFunc := func(data []byte) error {
+ writeErr := ioutil.WriteFile(key, data, 0)
+ return writeErr
+ }
+
+ if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
+ if localsink.isMultiPartEntry(key) {
+ return true, nil
+ }
+ glog.V(4).Infof("Update Entry key: %s", key)
+ // do delete and create
+ return false, nil
+}