aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/ISSUE_TEMPLATE/bug_report.md1
-rw-r--r--docker/Makefile3
-rw-r--r--docker/local-mount-compose.yml4
-rw-r--r--docker/local-replicate-compose.yml59
-rw-r--r--docker/notification.toml17
-rw-r--r--docker/replication.toml12
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/scaffold.go8
-rw-r--r--weed/filesys/meta_cache/meta_cache.go8
-rw-r--r--weed/replication/sink/localsink/local_sink.go108
-rw-r--r--weed/shell/command_volume_fix_replication.go38
13 files changed, 253 insertions, 10 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
index 53423f4b7..edb95fac1 100644
--- a/.github/ISSUE_TEMPLATE/bug_report.md
+++ b/.github/ISSUE_TEMPLATE/bug_report.md
@@ -9,6 +9,7 @@ assignees: ''
Sponsors SeaweedFS via Patreon https://www.patreon.com/seaweedfs
Report issues here. Ask questions here https://stackoverflow.com/questions/tagged/seaweedfs
+Please ask questions in https://github.com/chrislusf/seaweedfs/discussions
example of a good issue report:
https://github.com/chrislusf/seaweedfs/issues/1005
diff --git a/docker/Makefile b/docker/Makefile
index fe278f9b4..5949842f1 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -21,6 +21,9 @@ k8s: build
dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up
+dev_replicate: build
+ docker-compose -f local-replicate-compose.yml -p seaweedfs up
+
cluster: build
docker-compose -f local-cluster-compose.yml -p seaweedfs up
diff --git a/docker/local-mount-compose.yml b/docker/local-mount-compose.yml
index 97fc652d4..62a6691d6 100644
--- a/docker/local-mount-compose.yml
+++ b/docker/local-mount-compose.yml
@@ -31,7 +31,7 @@ services:
mount_1:
image: chrislusf/seaweedfs:local
privileged: true
- entrypoint: '/bin/sh -c "mkdir -p t1 && mkdir -p cache/t1 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t1 -dir=./t1 -filer.path=/"'
+ entrypoint: '/bin/sh -c "mkdir -p t1 && mkdir -p cache/t1 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t1 -dir=./t1 -filer.path=/c1"'
depends_on:
- master
- volume
@@ -39,7 +39,7 @@ services:
mount_2:
image: chrislusf/seaweedfs:local
privileged: true
- entrypoint: '/bin/sh -c "mkdir -p t2 && mkdir -p cache/t2 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t2 -dir=./t2 -filer.path=/"'
+ entrypoint: '/bin/sh -c "mkdir -p t2 && mkdir -p cache/t2 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t2 -dir=./t2 -filer.path=/c1"'
depends_on:
- master
- volume
diff --git a/docker/local-replicate-compose.yml b/docker/local-replicate-compose.yml
new file mode 100644
index 000000000..a8e0f808e
--- /dev/null
+++ b/docker/local-replicate-compose.yml
@@ -0,0 +1,59 @@
+version: '2'
+
+services:
+ master:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9333:9333
+ - 19333:19333
+ command: "master -ip=master"
+ volume:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8080:8080
+ - 18080:18080
+ command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1"
+ depends_on:
+ - master
+ filer:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8888:8888
+ - 18888:18888
+ command: '-v=9 filer -master="master:9333"'
+ restart: on-failure
+ volumes:
+ - ./notification.toml:/etc/seaweedfs/notification.toml
+ depends_on:
+ - master
+ - volume
+ - rabbitmq
+ - replicate
+ environment:
+ RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/"
+ replicate:
+ image: chrislusf/seaweedfs:local
+ command: '-v=9 filer.replicate'
+ restart: on-failure
+ volumes:
+ - ./notification.toml:/etc/seaweedfs/notification.toml
+ - ./replication.toml:/etc/seaweedfs/replication.toml
+ depends_on:
+ - rabbitmq
+ environment:
+ RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/"
+ s3:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8333:8333
+ command: 's3 -filer="filer:8888"'
+ depends_on:
+ - master
+ - volume
+ - filer
+ rabbitmq:
+ image: rabbitmq:3.8.10-management-alpine
+ ports:
+ - 5672:5672
+ - 15671:15671
+ - 15672:15672 \ No newline at end of file
diff --git a/docker/notification.toml b/docker/notification.toml
new file mode 100644
index 000000000..dcd5f2c6f
--- /dev/null
+++ b/docker/notification.toml
@@ -0,0 +1,17 @@
+[notification.log]
+# this is only for debugging perpose and does not work with "weed filer.replicate"
+enabled = false
+
+
+[notification.gocdk_pub_sub]
+# The Go Cloud Development Kit (https://gocloud.dev).
+# PubSub API (https://godoc.org/gocloud.dev/pubsub).
+# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
+enabled = true
+# This URL will Dial the RabbitMQ server at the URL in the environment
+# variable RABBIT_SERVER_URL and open the exchange "myexchange".
+# The exchange must have already been created by some other means, like
+# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
+# create binding myexchange => myqueue
+topic_url = "rabbit://swexchange"
+sub_url = "rabbit://swqueue" \ No newline at end of file
diff --git a/docker/replication.toml b/docker/replication.toml
new file mode 100644
index 000000000..2cee755a5
--- /dev/null
+++ b/docker/replication.toml
@@ -0,0 +1,12 @@
+[source.filer]
+enabled = true
+grpcAddress = "filer:18888"
+# all files under this directory tree are replicated.
+# this is not a directory on your hard drive, but on your filer.
+# i.e., all files with this "prefix" are sent to notification message queue.
+directory = "/buckets"
+
+[sink.local]
+enabled = true
+directory = "/data"
+todays_date_format = "2006-02-01" \ No newline at end of file
diff --git a/go.mod b/go.mod
index 39acd0e9b..cbd806799 100644
--- a/go.mod
+++ b/go.mod
@@ -56,7 +56,7 @@ require (
github.com/prometheus/client_golang v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
- github.com/seaweedfs/fuse v1.1.0
+ github.com/seaweedfs/fuse v1.1.1
github.com/seaweedfs/goexif v1.0.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spaolacci/murmur3 v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index eb16fac4c..db7d2748a 100644
--- a/go.sum
+++ b/go.sum
@@ -579,6 +579,8 @@ github.com/seaweedfs/fuse v1.0.9 h1:3JZoGsW7cmmrd/U5KYcIGR2+EqyBvCYCoCpEdZAz/Dc=
github.com/seaweedfs/fuse v1.0.9/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
github.com/seaweedfs/fuse v1.1.0 h1:cL1qPHFNtFv0UuJTLjKKgWDzfJ4iZzTa4Y7ipc2acGw=
github.com/seaweedfs/fuse v1.1.0/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
+github.com/seaweedfs/fuse v1.1.1 h1:WD51YFJcBViOx8I89jeqPD+vAKl4EowzBy9GUw0plb0=
+github.com/seaweedfs/fuse v1.1.1/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E=
github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk=
github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw=
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 0b6eaf94e..e8c06b208 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -11,6 +11,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 8b74274e5..1705c6ae4 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -329,7 +329,8 @@ enabled = false
# This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like
-# the RabbitMQ management plugin.
+# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
+# create binding myexchange => myqueue
topic_url = "rabbit://myexchange"
sub_url = "rabbit://myqueue"
`
@@ -350,6 +351,11 @@ grpcAddress = "localhost:18888"
# i.e., all files with this "prefix" are sent to notification message queue.
directory = "/buckets"
+[sink.local]
+enabled = false
+directory = "/backup"
+todays_date_format = "" # set this to 2006-02-01 for incremental backup
+
[sink.filer]
enabled = false
grpcAddress = "localhost:18888"
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index f4e4d7d6e..b9d4724c9 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
+ "strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -29,7 +30,12 @@ func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMa
localStore: openMetaStore(dbFolder),
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
- invalidateFunc: invalidateFunc,
+ invalidateFunc: func(fullpath util.FullPath) {
+ if baseDir != "/" && strings.HasPrefix(string(fullpath), string(baseDir)) {
+ fullpath = fullpath[len(baseDir):]
+ }
+ invalidateFunc(fullpath)
+ },
}
}
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
new file mode 100644
index 000000000..5ca562ec8
--- /dev/null
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -0,0 +1,108 @@
+package localsink
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+)
+
+type LocalSink struct {
+ dir string
+ todaysDateFormat string
+ filerSource *source.FilerSource
+}
+
+func init() {
+ sink.Sinks = append(sink.Sinks, &LocalSink{})
+}
+
+func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) {
+ localsink.filerSource = s
+}
+
+func (localsink *LocalSink) GetName() string {
+ return "local"
+}
+
+func (localsink *LocalSink) isMultiPartEntry(key string) bool {
+ return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
+}
+
+func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
+ localsink.dir = dir
+ localsink.todaysDateFormat = todaysDateFormat
+ return nil
+}
+
+func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
+ dir := configuration.GetString(prefix + "directory")
+ todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
+ glog.V(4).Infof("sink.local.directory: %v", dir)
+ return localsink.initialize(dir, todaysDateFormat)
+}
+
+func (localsink *LocalSink) GetSinkToDirectory() string {
+ if localsink.todaysDateFormat != "" {
+ return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
+ }
+ return localsink.dir
+}
+
+func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
+ if localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Delete Entry key: %s", key)
+ if err := os.Remove(key); err != nil {
+ glog.V(0).Infof("remove entry key %s: %s", key, err)
+ }
+ return nil
+}
+
+func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
+ if entry.IsDirectory || localsink.isMultiPartEntry(key) {
+ return nil
+ }
+ glog.V(4).Infof("Create Entry key: %s", key)
+
+ totalSize := filer.FileSize(entry)
+ chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
+
+ dir := filepath.Dir(key)
+
+ if _, err := os.Stat(dir); os.IsNotExist(err) {
+ glog.V(4).Infof("Create Direcotry key: %s", dir)
+ if err = os.MkdirAll(dir, 0); err != nil {
+ return err
+ }
+ }
+
+ writeFunc := func(data []byte) error {
+ writeErr := ioutil.WriteFile(key, data, 0)
+ return writeErr
+ }
+
+ if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
+ if localsink.isMultiPartEntry(key) {
+ return true, nil
+ }
+ glog.V(4).Infof("Update Entry key: %s", key)
+ // do delete and create
+ return false, nil
+}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 9b9abd8eb..8ae8850f3 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
+ "path/filepath"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -19,6 +20,7 @@ func init() {
}
type commandVolumeFixReplication struct {
+ collectionPattern *string
}
func (c *commandVolumeFixReplication) Name() string {
@@ -33,12 +35,13 @@ func (c *commandVolumeFixReplication) Help() string {
This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
- volume.fix.replication -n # do not take action
- volume.fix.replication # actually deleting or copying the volume files and mount the volume
+ volume.fix.replication -n # do not take action
+ volume.fix.replication # actually deleting or copying the volume files and mount the volume
+ volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
Note:
- * each time this will only add back one replica for one volume id. If there are multiple replicas
- are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
+ * each time this will only add back one replica for each volume id that is under replicated.
+ If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
* do not run this too quickly within seconds, since the new volume replica may take a few seconds
to register itself to the master.
@@ -52,6 +55,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
@@ -127,6 +131,17 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ break
+ }
+ }
+
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
if !takeAction {
@@ -147,9 +162,22 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
replica := pickOneReplicaToCopyFrom(replicas)
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
+ hasSkippedCollection := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ hasSkippedCollection = true
+ break
+ }
+ }
+
// ask the volume server to replicate the volume
foundNewLocation = true
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
@@ -179,7 +207,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
break
}
}
- if !foundNewLocation {
+ if !foundNewLocation && !hasSkippedCollection {
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}