diff options
| -rw-r--r-- | .github/ISSUE_TEMPLATE/bug_report.md | 1 | ||||
| -rw-r--r-- | docker/Makefile | 3 | ||||
| -rw-r--r-- | docker/local-mount-compose.yml | 4 | ||||
| -rw-r--r-- | docker/local-replicate-compose.yml | 59 | ||||
| -rw-r--r-- | docker/notification.toml | 17 | ||||
| -rw-r--r-- | docker/replication.toml | 12 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | go.sum | 2 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 1 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 8 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 8 | ||||
| -rw-r--r-- | weed/replication/sink/localsink/local_sink.go | 108 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 38 |
13 files changed, 253 insertions, 10 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 53423f4b7..edb95fac1 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -9,6 +9,7 @@ assignees: '' Sponsors SeaweedFS via Patreon https://www.patreon.com/seaweedfs Report issues here. Ask questions here https://stackoverflow.com/questions/tagged/seaweedfs +Please ask questions in https://github.com/chrislusf/seaweedfs/discussions example of a good issue report: https://github.com/chrislusf/seaweedfs/issues/1005 diff --git a/docker/Makefile b/docker/Makefile index fe278f9b4..5949842f1 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -21,6 +21,9 @@ k8s: build dev_registry: build docker-compose -f local-registry-compose.yml -p seaweedfs up +dev_replicate: build + docker-compose -f local-replicate-compose.yml -p seaweedfs up + cluster: build docker-compose -f local-cluster-compose.yml -p seaweedfs up diff --git a/docker/local-mount-compose.yml b/docker/local-mount-compose.yml index 97fc652d4..62a6691d6 100644 --- a/docker/local-mount-compose.yml +++ b/docker/local-mount-compose.yml @@ -31,7 +31,7 @@ services: mount_1: image: chrislusf/seaweedfs:local privileged: true - entrypoint: '/bin/sh -c "mkdir -p t1 && mkdir -p cache/t1 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t1 -dir=./t1 -filer.path=/"' + entrypoint: '/bin/sh -c "mkdir -p t1 && mkdir -p cache/t1 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t1 -dir=./t1 -filer.path=/c1"' depends_on: - master - volume @@ -39,7 +39,7 @@ services: mount_2: image: chrislusf/seaweedfs:local privileged: true - entrypoint: '/bin/sh -c "mkdir -p t2 && mkdir -p cache/t2 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t2 -dir=./t2 -filer.path=/"' + entrypoint: '/bin/sh -c "mkdir -p t2 && mkdir -p cache/t2 && weed -v=4 mount -filer=filer:8888 -cacheDir=./cache/t2 -dir=./t2 -filer.path=/c1"' depends_on: - master - volume diff --git a/docker/local-replicate-compose.yml b/docker/local-replicate-compose.yml new file mode 100644 index 000000000..a8e0f808e --- /dev/null +++ b/docker/local-replicate-compose.yml @@ -0,0 +1,59 @@ +version: '2' + +services: + master: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + command: "master -ip=master" + volume: + image: chrislusf/seaweedfs:local + ports: + - 8080:8080 + - 18080:18080 + command: "volume -mserver=master:9333 -port=8080 -ip=volume -preStopSeconds=1" + depends_on: + - master + filer: + image: chrislusf/seaweedfs:local + ports: + - 8888:8888 + - 18888:18888 + command: '-v=9 filer -master="master:9333"' + restart: on-failure + volumes: + - ./notification.toml:/etc/seaweedfs/notification.toml + depends_on: + - master + - volume + - rabbitmq + - replicate + environment: + RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/" + replicate: + image: chrislusf/seaweedfs:local + command: '-v=9 filer.replicate' + restart: on-failure + volumes: + - ./notification.toml:/etc/seaweedfs/notification.toml + - ./replication.toml:/etc/seaweedfs/replication.toml + depends_on: + - rabbitmq + environment: + RABBIT_SERVER_URL: "amqp://guest:guest@rabbitmq:5672/" + s3: + image: chrislusf/seaweedfs:local + ports: + - 8333:8333 + command: 's3 -filer="filer:8888"' + depends_on: + - master + - volume + - filer + rabbitmq: + image: rabbitmq:3.8.10-management-alpine + ports: + - 5672:5672 + - 15671:15671 + - 15672:15672
\ No newline at end of file diff --git a/docker/notification.toml b/docker/notification.toml new file mode 100644 index 000000000..dcd5f2c6f --- /dev/null +++ b/docker/notification.toml @@ -0,0 +1,17 @@ +[notification.log] +# this is only for debugging perpose and does not work with "weed filer.replicate" +enabled = false + + +[notification.gocdk_pub_sub] +# The Go Cloud Development Kit (https://gocloud.dev). +# PubSub API (https://godoc.org/gocloud.dev/pubsub). +# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. +enabled = true +# This URL will Dial the RabbitMQ server at the URL in the environment +# variable RABBIT_SERVER_URL and open the exchange "myexchange". +# The exchange must have already been created by some other means, like +# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then +# create binding myexchange => myqueue +topic_url = "rabbit://swexchange" +sub_url = "rabbit://swqueue"
\ No newline at end of file diff --git a/docker/replication.toml b/docker/replication.toml new file mode 100644 index 000000000..2cee755a5 --- /dev/null +++ b/docker/replication.toml @@ -0,0 +1,12 @@ +[source.filer] +enabled = true +grpcAddress = "filer:18888" +# all files under this directory tree are replicated. +# this is not a directory on your hard drive, but on your filer. +# i.e., all files with this "prefix" are sent to notification message queue. +directory = "/buckets" + +[sink.local] +enabled = true +directory = "/data" +todays_date_format = "2006-02-01"
\ No newline at end of file @@ -56,7 +56,7 @@ require ( github.com/prometheus/client_golang v1.3.0 github.com/rakyll/statik v0.1.7 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect - github.com/seaweedfs/fuse v1.1.0 + github.com/seaweedfs/fuse v1.1.1 github.com/seaweedfs/goexif v1.0.2 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -579,6 +579,8 @@ github.com/seaweedfs/fuse v1.0.9 h1:3JZoGsW7cmmrd/U5KYcIGR2+EqyBvCYCoCpEdZAz/Dc= github.com/seaweedfs/fuse v1.0.9/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= github.com/seaweedfs/fuse v1.1.0 h1:cL1qPHFNtFv0UuJTLjKKgWDzfJ4iZzTa4Y7ipc2acGw= github.com/seaweedfs/fuse v1.1.0/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= +github.com/seaweedfs/fuse v1.1.1 h1:WD51YFJcBViOx8I89jeqPD+vAKl4EowzBy9GUw0plb0= +github.com/seaweedfs/fuse v1.1.1/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E= github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk= github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw= diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 0b6eaf94e..e8c06b208 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -11,6 +11,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 8b74274e5..1705c6ae4 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -329,7 +329,8 @@ enabled = false # This URL will Dial the RabbitMQ server at the URL in the environment # variable RABBIT_SERVER_URL and open the exchange "myexchange". # The exchange must have already been created by some other means, like -# the RabbitMQ management plugin. +# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then +# create binding myexchange => myqueue topic_url = "rabbit://myexchange" sub_url = "rabbit://myqueue" ` @@ -350,6 +351,11 @@ grpcAddress = "localhost:18888" # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" +[sink.local] +enabled = false +directory = "/backup" +todays_date_format = "" # set this to 2006-02-01 for incremental backup + [sink.filer] enabled = false grpcAddress = "localhost:18888" diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index f4e4d7d6e..b9d4724c9 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strings" "sync" "github.com/chrislusf/seaweedfs/weed/filer" @@ -29,7 +30,12 @@ func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMa localStore: openMetaStore(dbFolder), visitedBoundary: bounded_tree.NewBoundedTree(baseDir), uidGidMapper: uidGidMapper, - invalidateFunc: invalidateFunc, + invalidateFunc: func(fullpath util.FullPath) { + if baseDir != "/" && strings.HasPrefix(string(fullpath), string(baseDir)) { + fullpath = fullpath[len(baseDir):] + } + invalidateFunc(fullpath) + }, } } diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go new file mode 100644 index 000000000..5ca562ec8 --- /dev/null +++ b/weed/replication/sink/localsink/local_sink.go @@ -0,0 +1,108 @@ +package localsink + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" + "github.com/chrislusf/seaweedfs/weed/replication/sink" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" +) + +type LocalSink struct { + dir string + todaysDateFormat string + filerSource *source.FilerSource +} + +func init() { + sink.Sinks = append(sink.Sinks, &LocalSink{}) +} + +func (localsink *LocalSink) SetSourceFiler(s *source.FilerSource) { + localsink.filerSource = s +} + +func (localsink *LocalSink) GetName() string { + return "local" +} + +func (localsink *LocalSink) isMultiPartEntry(key string) bool { + return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/") +} + +func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error { + localsink.dir = dir + localsink.todaysDateFormat = todaysDateFormat + return nil +} + +func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { + dir := configuration.GetString(prefix + "directory") + todaysDateFormat := configuration.GetString(prefix + "todays_date_format") + glog.V(4).Infof("sink.local.directory: %v", dir) + return localsink.initialize(dir, todaysDateFormat) +} + +func (localsink *LocalSink) GetSinkToDirectory() string { + if localsink.todaysDateFormat != "" { + return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat)) + } + return localsink.dir +} + +func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { + if localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Delete Entry key: %s", key) + if err := os.Remove(key); err != nil { + glog.V(0).Infof("remove entry key %s: %s", key, err) + } + return nil +} + +func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error { + if entry.IsDirectory || localsink.isMultiPartEntry(key) { + return nil + } + glog.V(4).Infof("Create Entry key: %s", key) + + totalSize := filer.FileSize(entry) + chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) + + dir := filepath.Dir(key) + + if _, err := os.Stat(dir); os.IsNotExist(err) { + glog.V(4).Infof("Create Direcotry key: %s", dir) + if err = os.MkdirAll(dir, 0); err != nil { + return err + } + } + + writeFunc := func(data []byte) error { + writeErr := ioutil.WriteFile(key, data, 0) + return writeErr + } + + if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil { + return err + } + + return nil +} + +func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) { + if localsink.isMultiPartEntry(key) { + return true, nil + } + glog.V(4).Infof("Update Entry key: %s", key) + // do delete and create + return false, nil +} diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 9b9abd8eb..8ae8850f3 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" + "path/filepath" "sort" "github.com/chrislusf/seaweedfs/weed/operation" @@ -19,6 +20,7 @@ func init() { } type commandVolumeFixReplication struct { + collectionPattern *string } func (c *commandVolumeFixReplication) Name() string { @@ -33,12 +35,13 @@ func (c *commandVolumeFixReplication) Help() string { This command also finds all under-replicated volumes, and finds volume servers with free slots. If the free slots satisfy the replication requirement, the volume content is copied over and mounted. - volume.fix.replication -n # do not take action - volume.fix.replication # actually deleting or copying the volume files and mount the volume + volume.fix.replication -n # do not take action + volume.fix.replication # actually deleting or copying the volume files and mount the volume + volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important" Note: - * each time this will only add back one replica for one volume id. If there are multiple replicas - are missing, e.g. multiple volume servers are new, you may need to run this multiple times. + * each time this will only add back one replica for each volume id that is under replicated. + If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times. * do not run this too quickly within seconds, since the new volume replica may take a few seconds to register itself to the master. @@ -52,6 +55,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") if err = volFixReplicationCommand.Parse(args); err != nil { return nil @@ -127,6 +131,17 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma replica := pickOneReplicaToDelete(replicas, replicaPlacement) + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + break + } + } + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) if !takeAction { @@ -147,9 +162,22 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm replica := pickOneReplicaToCopyFrom(replicas) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false + hasSkippedCollection := false for _, dst := range allLocations { // check whether data nodes satisfy the constraints if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + hasSkippedCollection = true + break + } + } + // ask the volume server to replicate the volume foundNewLocation = true fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) @@ -179,7 +207,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm break } } - if !foundNewLocation { + if !foundNewLocation && !hasSkippedCollection { fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } |
