aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorBl1tz23 <alex3angle@gmail.com>2021-08-10 13:45:24 +0300
committerBl1tz23 <alex3angle@gmail.com>2021-08-10 13:45:24 +0300
commit1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch)
tree12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/command
parente6e57db530217ff57b3622b4672b03ebb6313e96 (diff)
parentf9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff)
downloadseaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz
seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip
merge master, resolve conflicts
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/autocomplete.go109
-rw-r--r--weed/command/command.go7
-rw-r--r--weed/command/filer.go7
-rw-r--r--weed/command/filer_backup.go55
-rw-r--r--weed/command/filer_copy.go37
-rw-r--r--weed/command/filer_meta_backup.go49
-rw-r--r--weed/command/filer_meta_tail.go37
-rw-r--r--weed/command/filer_remote_sync.go257
-rw-r--r--weed/command/filer_replication.go6
-rw-r--r--weed/command/filer_sync.go77
-rw-r--r--weed/command/filer_sync_std.go7
-rw-r--r--weed/command/filer_sync_windows.go12
-rw-r--r--weed/command/fuse.go319
-rw-r--r--weed/command/iam.go1
-rw-r--r--weed/command/imports.go30
-rw-r--r--weed/command/mount_std.go30
-rw-r--r--weed/command/scaffold.go528
-rw-r--r--weed/command/scaffold/example.go21
-rw-r--r--weed/command/scaffold/filer.toml232
-rw-r--r--weed/command/scaffold/master.toml63
-rw-r--r--weed/command/scaffold/notification.toml54
-rw-r--r--weed/command/scaffold/replication.toml71
-rw-r--r--weed/command/scaffold/security.toml60
-rw-r--r--weed/command/scaffold/shell.toml10
-rw-r--r--weed/command/server.go14
-rw-r--r--weed/command/shell.go1
-rw-r--r--weed/command/upload.go17
-rw-r--r--weed/command/volume.go60
28 files changed, 1310 insertions, 861 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go
new file mode 100644
index 000000000..9a545a183
--- /dev/null
+++ b/weed/command/autocomplete.go
@@ -0,0 +1,109 @@
+package command
+
+import (
+ "fmt"
+ flag "github.com/chrislusf/seaweedfs/weed/util/fla9"
+ "github.com/posener/complete"
+ completeinstall "github.com/posener/complete/cmd/install"
+ "runtime"
+)
+
+func AutocompleteMain(commands []*Command) bool {
+ subCommands := make(map[string]complete.Command)
+ helpSubCommands := make(map[string]complete.Command)
+ for _, cmd := range commands {
+ flags := make(map[string]complete.Predictor)
+ cmd.Flag.VisitAll(func(flag *flag.Flag) {
+ flags["-"+flag.Name] = complete.PredictAnything
+ })
+
+ subCommands[cmd.Name()] = complete.Command{
+ Flags: flags,
+ }
+ helpSubCommands[cmd.Name()] = complete.Command{}
+ }
+ subCommands["help"] = complete.Command{Sub: helpSubCommands}
+
+ globalFlags := make(map[string]complete.Predictor)
+ flag.VisitAll(func(flag *flag.Flag) {
+ globalFlags["-"+flag.Name] = complete.PredictAnything
+ })
+
+ weedCmd := complete.Command{
+ Sub: subCommands,
+ Flags: globalFlags,
+ GlobalFlags: complete.Flags{"-h": complete.PredictNothing},
+ }
+ cmp := complete.New("weed", weedCmd)
+
+ return cmp.Complete()
+}
+
+func installAutoCompletion() bool {
+ if runtime.GOOS == "windows" {
+ fmt.Println("windows is not supported")
+ return false
+ }
+
+ err := completeinstall.Install("weed")
+ if err != nil {
+ fmt.Printf("install failed! %s\n", err)
+ return false
+ }
+ fmt.Printf("autocompletion is enabled. Please restart your shell.\n")
+ return true
+}
+
+func uninstallAutoCompletion() bool {
+ if runtime.GOOS == "windows" {
+ fmt.Println("windows is not supported")
+ return false
+ }
+
+ err := completeinstall.Uninstall("weed")
+ if err != nil {
+ fmt.Printf("uninstall failed! %s\n", err)
+ return false
+ }
+ fmt.Printf("autocompletion is disable. Please restart your shell.\n")
+ return true
+}
+
+var cmdAutocomplete = &Command{
+ Run: runAutocomplete,
+ UsageLine: "autocomplete",
+ Short: "install autocomplete",
+ Long: `weed autocomplete is installed in the shell.
+
+ Supported shells are bash, zsh, and fish.
+ Windows is not supported.
+
+`,
+}
+
+func runAutocomplete(cmd *Command, args []string) bool {
+ if len(args) != 0 {
+ cmd.Usage()
+ }
+
+ return installAutoCompletion()
+}
+
+var cmdUnautocomplete = &Command{
+ Run: runUnautocomplete,
+ UsageLine: "autocomplete.uninstall",
+ Short: "uninstall autocomplete",
+ Long: `weed autocomplete is uninstalled in the shell.
+
+ Windows is not supported.
+
+`,
+}
+
+func runUnautocomplete(cmd *Command, args []string) bool {
+ if len(args) != 0 {
+ cmd.Usage()
+ }
+
+ return uninstallAutoCompletion()
+}
diff --git a/weed/command/command.go b/weed/command/command.go
index 18e53ad8c..02de2bd35 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -8,17 +8,20 @@ import (
)
var Commands = []*Command{
- cmdBenchmark,
+ cmdAutocomplete,
+ cmdUnautocomplete,
cmdBackup,
+ cmdBenchmark,
cmdCompact,
- cmdCopy,
cmdDownload,
cmdExport,
cmdFiler,
cmdFilerBackup,
cmdFilerCat,
+ cmdFilerCopy,
cmdFilerMetaBackup,
cmdFilerMetaTail,
+ cmdFilerRemoteSynchronize,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a723b4d8a..ddee0852c 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -50,6 +50,8 @@ type FilerOptions struct {
saveToFilerLimit *int
defaultLevelDbDirectory *string
concurrentUploadLimitMB *int
+ debug *bool
+ debugPort *int
}
func init() {
@@ -73,6 +75,8 @@ func init() {
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
+ f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -122,6 +126,9 @@ var cmdFiler = &Command{
}
func runFiler(cmd *Command, args []string) bool {
+ if *f.debug {
+ go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil)
+ }
util.LoadConfiguration("security", false)
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 888b46fe7..0c450181b 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,16 +1,13 @@
package command
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
"time"
)
@@ -52,11 +49,11 @@ var cmdFilerBackup = &Command{
func runFilerBackup(cmd *Command, args []string) bool {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
for {
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
if err != nil {
@@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "backup_" + dataSink.GetName(),
- PathPrefix: sourcePath,
- SinceNs: startFrom.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
-
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("processEventFn: %v", err)
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
- return fmt.Errorf("setOffset: %v", err)
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
+ sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index a5d29c451..722f64679 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -52,21 +52,21 @@ type CopyOptions struct {
}
func init() {
- cmdCopy.Run = runCopy // break init cycle
- cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
- copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
- copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
- copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
- copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
- copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
- copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
- copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
- copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
- copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying")
+ cmdFilerCopy.Run = runCopy // break init cycle
+ cmdFilerCopy.IsDebug = cmdFilerCopy.Flag.Bool("debug", false, "verbose debug information")
+ copy.include = cmdFilerCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
+ copy.replication = cmdFilerCopy.Flag.String("replication", "", "replication type")
+ copy.collection = cmdFilerCopy.Flag.String("collection", "", "optional collection name")
+ copy.ttl = cmdFilerCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+ copy.diskType = cmdFilerCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
+ copy.maxMB = cmdFilerCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
+ copy.concurrenctFiles = cmdFilerCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctChunks = cmdFilerCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
+ copy.checkSize = cmdFilerCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
+ copy.verbose = cmdFilerCopy.Flag.Bool("verbose", false, "print out details during copying")
}
-var cmdCopy = &Command{
+var cmdFilerCopy = &Command{
UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
Short: "copy one or a list of files to a filer folder",
Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
@@ -154,7 +154,7 @@ func runCopy(cmd *Command, args []string) bool {
}
copy.ttlSec = int32(ttl.Minutes()) * 60
- if *cmdCopy.IsDebug {
+ if *cmdFilerCopy.IsDebug {
grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
@@ -213,11 +213,15 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
mode := fi.Mode()
uid, gid := util.GetFileUidGid(fi)
+ fileSize := fi.Size()
+ if mode.IsDir() {
+ fileSize = 0
+ }
fileCopyTaskChan <- FileCopyTask{
sourceLocation: fileOrDir,
destinationUrlPath: destPath,
- fileSize: fi.Size(),
+ fileSize: fileSize,
fileMode: fi.Mode(),
uid: uid,
gid: gid,
@@ -377,6 +381,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if assignResult.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
}
+ if assignResult.Url == "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult)
+ }
return nil
})
})
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index ba0b44659..6fe323fba 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
- "io"
"reflect"
"time"
@@ -53,6 +52,7 @@ The backup writes to another filer store specified in a backup_filer.toml.
func runFilerMetaBackup(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
// load backup_filer.toml
@@ -189,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
- tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "meta_backup",
- PathPrefix: *metaBackup.filerDirectory,
- SinceNs: startTime.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
- return err2
- }
- }
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
+ return metaBackup.setOffset(lastTime)
+ })
- }
+ return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
+ *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
- })
- return tailErr
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 8451ffd78..28c0db99b 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -3,16 +3,15 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
- "io"
"os"
"path/filepath"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -45,6 +44,7 @@ var (
func runFilerMetaTail(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var filterFunc func(dir, fname string) bool
@@ -103,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *tailTarget,
- SinceNs: time.Now().Add(-*tailStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
+ *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
+ func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
- continue
+ return nil
}
- if err = eachEntryFunc(resp); err != nil {
+ if err := eachEntryFunc(resp); err != nil {
return err
}
- }
+ return nil
+ }, false)
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
new file mode 100644
index 000000000..b7e90b3e7
--- /dev/null
+++ b/weed/command/filer_remote_sync.go
@@ -0,0 +1,257 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+ "time"
+)
+
+type RemoteSyncOptions struct {
+ filerAddress *string
+ grpcDialOption grpc.DialOption
+ readChunkFromFiler *bool
+ debug *bool
+ timeAgo *time.Duration
+ dir *string
+}
+
+const (
+ RemoteSyncKeyPrefix = "remote.sync."
+)
+
+var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
+
+func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+}
+func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
+var (
+ remoteSyncOptions RemoteSyncOptions
+)
+
+func init() {
+ cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
+ remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
+ remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
+ remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
+ remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
+ remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerRemoteSynchronize = &Command{
+ UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
+ Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
+ Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
+
+ filer.remote.sync listens on filer update events.
+ If any mounted remote file is updated, it will fetch the updated content,
+ and write to the remote storage.
+`,
+}
+
+func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ remoteSyncOptions.grpcDialOption = grpcDialOption
+
+ // read filer remote storage mount mappings
+ mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
+ if readErr != nil {
+ fmt.Printf("read mount mapping: %v", readErr)
+ return false
+ }
+
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(
+ *remoteSyncOptions.filerAddress,
+ pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
+ "/", // does not matter
+ *remoteSyncOptions.readChunkFromFiler,
+ )
+
+ var found bool
+ for dir, remoteStorageMountLocation := range mappings.Mappings {
+ if *remoteSyncOptions.dir == dir {
+ found = true
+ storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
+ if readErr != nil {
+ fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
+ continue
+ }
+ fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
+ if err := util.Retry("filer.remote.sync "+dir, func() error {
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
+ }); err != nil {
+ fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
+ }
+ break
+ }
+ }
+ if !found {
+ fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
+ return false
+ }
+
+ return true
+}
+
+func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
+
+ dirHash := util.HashStringToLong(mountedDir)
+
+ // 1. specified by timeAgo
+ // 2. last offset timestamp for this directory
+ // 3. directory creation time
+ var lastOffsetTs time.Time
+ if *option.timeAgo == 0 {
+ mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ if err != nil {
+ return fmt.Errorf("lookup %s: %v", mountedDir, err)
+ }
+
+ lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
+ } else {
+ lastOffsetTs = time.Now().Add(-*option.timeAgo)
+ }
+
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return err
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if len(message.NewEntry.Chunks) == 0 {
+ return nil
+ }
+ fmt.Printf("create: %+v\n", resp)
+ if !shouldSendToRemote(message.NewEntry) {
+ fmt.Printf("skipping creating: %+v\n", resp)
+ return nil
+ }
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if message.NewEntry.IsDirectory {
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ fmt.Printf("delete: %+v\n", resp)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ return client.DeleteFile(dest)
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if !shouldSendToRemote(message.NewEntry) {
+ fmt.Printf("skipping updating: %+v\n", resp)
+ return nil
+ }
+ if message.NewEntry.IsDirectory {
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
+ fmt.Printf("update meta: %+v\n", resp)
+ return client.UpdateFileMetadata(dest, message.NewEntry)
+ }
+ }
+ fmt.Printf("update: %+v\n", resp)
+ if err := client.DeleteFile(oldDest); err != nil {
+ return err
+ }
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+
+ return nil
+ }
+
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
+ return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
+ })
+
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
+ "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+}
+
+func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
+ source := string(sourcePath[len(mountDir):])
+ dest := util.FullPath(remoteMountLocation.Path).Child(source)
+ return &filer_pb.RemoteStorageLocation{
+ Name: remoteMountLocation.Name,
+ Bucket: remoteMountLocation.Bucket,
+ Path: string(dest),
+ }
+}
+
+func isSameChunks(a, b []*filer_pb.FileChunk) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i := 0; i < len(a); i++ {
+ x, y := a[i], b[i]
+ if !proto.Equal(x, y) {
+ return false
+ }
+ }
+ return true
+}
+
+func shouldSendToRemote(entry *filer_pb.Entry) bool {
+ if entry.RemoteEntry == nil {
+ return true
+ }
+ if entry.RemoteEntry.LocalMtime < entry.Attributes.Mtime {
+ return true
+ }
+ return false
+}
+
+func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
+ entry.RemoteEntry = remoteEntry
+ return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ return err
+ })
+}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 885c95540..bf0a3e140 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -7,12 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 0f34e5701..5440811dd 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
- "io"
"strings"
"time"
)
@@ -71,8 +70,8 @@ func init() {
var cmdFilerSynchronize = &Command{
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
- Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
- Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
+ Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
+ Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination. Different from filer.replicate:
@@ -89,6 +88,7 @@ var cmdFilerSynchronize = &Command{
func runFilerSynchronize(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
@@ -165,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "syncTo_" + targetFiler,
- PathPrefix: sourcePath,
- SinceNs: sourceFilerOffsetTsNs,
- Signature: targetFilerSignature,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
- return err
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
+ sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+
}
const (
@@ -359,16 +323,19 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
return processEventFn
}
-func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string {
+func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
if !dataSink.IsIncremental() {
- return util.Join(targetPath, string(sourceKey)[len(sourcePath):])
- }
- var mTime int64
- if message.NewEntry != nil {
- mTime = message.NewEntry.Attributes.Mtime
- } else if message.OldEntry != nil {
- mTime = message.OldEntry.Attributes.Mtime
+ key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
+ } else {
+ var mTime int64
+ if message.NewEntry != nil {
+ mTime = message.NewEntry.Attributes.Mtime
+ } else if message.OldEntry != nil {
+ mTime = message.OldEntry.Attributes.Mtime
+ }
+ dateKey := time.Unix(mTime, 0).Format("2006-01-02")
+ key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
}
- dateKey := time.Unix(mTime, 0).Format("2006-01-02")
- return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
+
+ return escapeKey(key)
}
diff --git a/weed/command/filer_sync_std.go b/weed/command/filer_sync_std.go
new file mode 100644
index 000000000..63851eaf8
--- /dev/null
+++ b/weed/command/filer_sync_std.go
@@ -0,0 +1,7 @@
+// +build !windows
+
+package command
+
+func escapeKey(key string) string {
+ return key
+}
diff --git a/weed/command/filer_sync_windows.go b/weed/command/filer_sync_windows.go
new file mode 100644
index 000000000..3d0c9146e
--- /dev/null
+++ b/weed/command/filer_sync_windows.go
@@ -0,0 +1,12 @@
+package command
+
+import (
+ "strings"
+)
+
+func escapeKey(key string) string {
+ if strings.Contains(key, ":") {
+ return strings.ReplaceAll(key, ":", "")
+ }
+ return key
+}
diff --git a/weed/command/fuse.go b/weed/command/fuse.go
index 0a55e509c..a0dcaa86c 100644
--- a/weed/command/fuse.go
+++ b/weed/command/fuse.go
@@ -2,140 +2,232 @@ package command
import (
"fmt"
- "strings"
+ "os"
"strconv"
+ "strings"
"time"
- "os"
)
func init() {
cmdFuse.Run = runFuse // break init cycle
}
+type parameter struct {
+ name string
+ value string
+}
+
func runFuse(cmd *Command, args []string) bool {
- argsLen := len(args)
- options := []string{}
+ rawArgs := strings.Join(args, " ")
+ rawArgsLen := len(rawArgs)
+ option := strings.Builder{}
+ options := []parameter{}
+ masterProcess := true
+ fusermountPath := ""
+
+ // first parameter
+ i := 0
+ for i = 0; i < rawArgsLen && rawArgs[i] != ' '; i++ {
+ option.WriteByte(rawArgs[i])
+ }
+ options = append(options, parameter{"arg0", option.String()})
+ option.Reset()
+
+ for i++; i < rawArgsLen; i++ {
+
+ // space separator check for filled option
+ if rawArgs[i] == ' ' {
+ if option.Len() > 0 {
+ options = append(options, parameter{option.String(), "true"})
+ option.Reset()
+ }
- // at least target mount path should be passed
- if argsLen < 1 {
- return false
+ // dash separator read option until next space
+ } else if rawArgs[i] == '-' {
+ for i++; i < rawArgsLen && rawArgs[i] != ' '; i++ {
+ option.WriteByte(rawArgs[i])
+ }
+ options = append(options, parameter{option.String(), "true"})
+ option.Reset()
+
+ // equal separator start option with pending value
+ } else if rawArgs[i] == '=' {
+ name := option.String()
+ option.Reset()
+
+ for i++; i < rawArgsLen && rawArgs[i] != ',' && rawArgs[i] != ' '; i++ {
+ // double quote separator read option until next double quote
+ if rawArgs[i] == '"' {
+ for i++; i < rawArgsLen && rawArgs[i] != '"'; i++ {
+ option.WriteByte(rawArgs[i])
+ }
+
+ // single quote separator read option until next single quote
+ } else if rawArgs[i] == '\'' {
+ for i++; i < rawArgsLen && rawArgs[i] != '\''; i++ {
+ option.WriteByte(rawArgs[i])
+ }
+
+ // add chars before comma
+ } else if rawArgs[i] != ' ' {
+ option.WriteByte(rawArgs[i])
+ }
+ }
+
+ options = append(options, parameter{name, option.String()})
+ option.Reset()
+
+ // comma separator just read current option
+ } else if rawArgs[i] == ',' {
+ options = append(options, parameter{option.String(), "true"})
+ option.Reset()
+
+ // what is not a separator fill option buffer
+ } else {
+ option.WriteByte(rawArgs[i])
+ }
}
- // first option is always target mount path
- mountOptions.dir = &args[0]
+ // get residual option data
+ if option.Len() > 0 {
+ // add value to pending option
+ options = append(options, parameter{option.String(), "true"})
+ option.Reset()
+ }
- // scan parameters looking for one or more -o options
- // -o options receive parameters on format key=value[,key=value]...
- for i := 0; i < argsLen; i++ {
- if args[i] == "-o" && i+1 <= argsLen {
- options = strings.Split(args[i+1], ",")
+ // scan each parameter
+ for i := 0; i < len(options); i++ {
+ parameter := options[i]
+
+ switch parameter.name {
+ case "child":
+ masterProcess = false
+ case "arg0":
+ mountOptions.dir = &parameter.value
+ case "filer":
+ mountOptions.filer = &parameter.value
+ case "filer.path":
+ mountOptions.filerMountRootPath = &parameter.value
+ case "dirAutoCreate":
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
+ mountOptions.dirAutoCreate = &parsed
+ } else {
+ panic(fmt.Errorf("dirAutoCreate: %s", err))
+ }
+ case "collection":
+ mountOptions.collection = &parameter.value
+ case "replication":
+ mountOptions.replication = &parameter.value
+ case "disk":
+ mountOptions.diskType = &parameter.value
+ case "ttl":
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
+ intValue := int(parsed)
+ mountOptions.ttlSec = &intValue
+ } else {
+ panic(fmt.Errorf("ttl: %s", err))
+ }
+ case "chunkSizeLimitMB":
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
+ intValue := int(parsed)
+ mountOptions.chunkSizeLimitMB = &intValue
+ } else {
+ panic(fmt.Errorf("chunkSizeLimitMB: %s", err))
+ }
+ case "concurrentWriters":
i++
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
+ intValue := int(parsed)
+ mountOptions.concurrentWriters = &intValue
+ } else {
+ panic(fmt.Errorf("concurrentWriters: %s", err))
+ }
+ case "cacheDir":
+ mountOptions.cacheDir = &parameter.value
+ case "cacheCapacityMB":
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 64); err == nil {
+ mountOptions.cacheSizeMB = &parsed
+ } else {
+ panic(fmt.Errorf("cacheCapacityMB: %s", err))
+ }
+ case "dataCenter":
+ mountOptions.dataCenter = &parameter.value
+ case "allowOthers":
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
+ mountOptions.allowOthers = &parsed
+ } else {
+ panic(fmt.Errorf("allowOthers: %s", err))
+ }
+ case "umask":
+ mountOptions.umaskString = &parameter.value
+ case "nonempty":
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
+ mountOptions.nonempty = &parsed
+ } else {
+ panic(fmt.Errorf("nonempty: %s", err))
+ }
+ case "volumeServerAccess":
+ mountOptions.volumeServerAccess = &parameter.value
+ case "map.uid":
+ mountOptions.uidMap = &parameter.value
+ case "map.gid":
+ mountOptions.gidMap = &parameter.value
+ case "readOnly":
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
+ mountOptions.readOnly = &parsed
+ } else {
+ panic(fmt.Errorf("readOnly: %s", err))
+ }
+ case "cpuprofile":
+ mountCpuProfile = &parameter.value
+ case "memprofile":
+ mountMemProfile = &parameter.value
+ case "readRetryTime":
+ if parsed, err := time.ParseDuration(parameter.value); err == nil {
+ mountReadRetryTime = &parsed
+ } else {
+ panic(fmt.Errorf("readRetryTime: %s", err))
+ }
+ case "fusermount.path":
+ fusermountPath = parameter.value
}
}
- // for each option passed with -o
- for _, option := range options {
- // split just first = character
- parts := strings.SplitN(option, "=", 2)
+ // the master start the child, release it then finish himself
+ if masterProcess {
+ arg0, err := os.Executable()
+ if err != nil {
+ panic(err)
+ }
+
+ argv := append(os.Args, "-o", "child")
+
+ attr := os.ProcAttr{}
+ attr.Env = os.Environ()
+
+ child, err := os.StartProcess(arg0, argv, &attr)
- // if doesn't key and value skip
- if len(parts) != 2 {
- continue
+ if err != nil {
+ panic(fmt.Errorf("master process can not start child process: %s", err))
}
- key, value := parts[0], parts[1]
-
- // switch key keeping "weed mount" parameters
- switch key {
- case "filer":
- mountOptions.filer = &value
- case "filer.path":
- mountOptions.filerMountRootPath = &value
- case "dirAutoCreate":
- if parsed, err := strconv.ParseBool(value); err != nil {
- mountOptions.dirAutoCreate = &parsed
- } else {
- panic(fmt.Errorf("dirAutoCreate: %s", err))
- }
- case "collection":
- mountOptions.collection = &value
- case "replication":
- mountOptions.replication = &value
- case "disk":
- mountOptions.diskType = &value
- case "ttl":
- if parsed, err := strconv.ParseInt(value, 0, 32); err != nil {
- intValue := int(parsed)
- mountOptions.ttlSec = &intValue
- } else {
- panic(fmt.Errorf("ttl: %s", err))
- }
- case "chunkSizeLimitMB":
- if parsed, err := strconv.ParseInt(value, 0, 32); err != nil {
- intValue := int(parsed)
- mountOptions.chunkSizeLimitMB = &intValue
- } else {
- panic(fmt.Errorf("chunkSizeLimitMB: %s", err))
- }
- case "concurrentWriters":
- if parsed, err := strconv.ParseInt(value, 0, 32); err != nil {
- intValue := int(parsed)
- mountOptions.concurrentWriters = &intValue
- } else {
- panic(fmt.Errorf("concurrentWriters: %s", err))
- }
- case "cacheDir":
- mountOptions.cacheDir = &value
- case "cacheCapacityMB":
- if parsed, err := strconv.ParseInt(value, 0, 64); err != nil {
- mountOptions.cacheSizeMB = &parsed
- } else {
- panic(fmt.Errorf("cacheCapacityMB: %s", err))
- }
- case "dataCenter":
- mountOptions.dataCenter = &value
- case "allowOthers":
- if parsed, err := strconv.ParseBool(value); err != nil {
- mountOptions.allowOthers = &parsed
- } else {
- panic(fmt.Errorf("allowOthers: %s", err))
- }
- case "umask":
- mountOptions.umaskString = &value
- case "nonempty":
- if parsed, err := strconv.ParseBool(value); err != nil {
- mountOptions.nonempty = &parsed
- } else {
- panic(fmt.Errorf("nonempty: %s", err))
- }
- case "volumeServerAccess":
- mountOptions.volumeServerAccess = &value
- case "map.uid":
- mountOptions.uidMap = &value
- case "map.gid":
- mountOptions.gidMap = &value
- case "readOnly":
- if parsed, err := strconv.ParseBool(value); err != nil {
- mountOptions.readOnly = &parsed
- } else {
- panic(fmt.Errorf("readOnly: %s", err))
- }
- case "cpuprofile":
- mountCpuProfile = &value
- case "memprofile":
- mountMemProfile = &value
- case "readRetryTime":
- if parsed, err := time.ParseDuration(value); err != nil {
- mountReadRetryTime = &parsed
- } else {
- panic(fmt.Errorf("readRetryTime: %s", err))
- }
+ err = child.Release()
+
+ if err != nil {
+ panic(fmt.Errorf("master process can not release child process: %s", err))
}
+
+ return true
}
- // I don't know why PATH environment variable is lost
- if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin"); err != nil {
- panic(fmt.Errorf("setenv: %s", err))
+ if fusermountPath != "" {
+ if err := os.Setenv("PATH", fusermountPath); err != nil {
+ panic(fmt.Errorf("setenv: %s", err))
+ }
+ } else if os.Getenv("PATH") == "" {
+ if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin"); err != nil {
+ panic(fmt.Errorf("setenv: %s", err))
+ }
}
// just call "weed mount" command
@@ -144,7 +236,7 @@ func runFuse(cmd *Command, args []string) bool {
var cmdFuse = &Command{
UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"",
- Short: "Allow use weed with linux's mount command",
+ Short: "Allow use weed with linux's mount command",
Long: `Allow use weed with linux's mount command
You can use -t weed on mount command:
@@ -160,6 +252,9 @@ var cmdFuse = &Command{
mount -t fuse./home/user/bin/weed fuse /mnt -o "filer=localhost:8888,filer.path=/"
mount -t fuse "/home/user/bin/weed#fuse" /mnt -o "filer=localhost:8888,filer.path=/"
+ To pass more than one parameter use quotes, example:
+ mount -t weed fuse /mnt -o "filer='192.168.0.1:8888,192.168.0.2:8888',filer.path=/"
+
To check valid options look "weed mount --help"
`,
}
diff --git a/weed/command/iam.go b/weed/command/iam.go
index 17d0832cb..ed4eea543 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -49,6 +49,7 @@ func (iamopt *IamOptions) startIamServer() bool {
return false
}
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/command/imports.go b/weed/command/imports.go
new file mode 100644
index 000000000..ce0bf0e10
--- /dev/null
+++ b/weed/command/imports.go
@@ -0,0 +1,30 @@
+package command
+
+import (
+ _ "net/http/pprof"
+
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
+
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+)
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index e72a2f2cf..cdf340067 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -5,15 +5,18 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"os/user"
"path"
+ "path/filepath"
"runtime"
"strconv"
"strings"
+ "syscall"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/seaweedfs/fuse"
@@ -49,6 +52,21 @@ func runMount(cmd *Command, args []string) bool {
return RunMount(&mountOptions, os.FileMode(umask))
}
+func getParentInode(mountDir string) (uint64, error) {
+ parentDir := filepath.Clean(filepath.Join(mountDir, ".."))
+ fi, err := os.Stat(parentDir)
+ if err != nil {
+ return 0, err
+ }
+
+ stat, ok := fi.Sys().(*syscall.Stat_t)
+ if !ok {
+ return 0, nil
+ }
+
+ return stat.Ino, nil
+}
+
func RunMount(option *MountOptions, umask os.FileMode) bool {
filers := strings.Split(*option.filer, ",")
@@ -85,13 +103,19 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
filerMountRootPath := *option.filerMountRootPath
dir := util.ResolvePath(*option.dir)
- chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
+ parentInode, err := getParentInode(dir)
+ if err != nil {
+ glog.Errorf("failed to retrieve inode for parent directory of %s: %v", dir, err)
+ return true
+ }
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
+
+ chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
if chunkSizeLimitMB <= 0 {
fmt.Printf("Please specify a reasonable buffer size.")
return false
@@ -199,6 +223,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
+ MountParentInode: parentInode,
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
@@ -221,6 +246,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
server := fs.New(c, nil)
seaweedFileSystem.Server = server
+ seaweedFileSystem.StartBackgroundTasks()
err = server.Serve(seaweedFileSystem)
// check if the mount process has an error to report
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 2d6729bd3..886c0ac5e 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -1,6 +1,8 @@
package command
import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/command/scaffold"
"io/ioutil"
"path/filepath"
)
@@ -35,17 +37,17 @@ func runScaffold(cmd *Command, args []string) bool {
content := ""
switch *config {
case "filer":
- content = FILER_TOML_EXAMPLE
+ content = scaffold.Filer
case "notification":
- content = NOTIFICATION_TOML_EXAMPLE
+ content = scaffold.Notification
case "replication":
- content = REPLICATION_TOML_EXAMPLE
+ content = scaffold.Replication
case "security":
- content = SECURITY_TOML_EXAMPLE
+ content = scaffold.Security
case "master":
- content = MASTER_TOML_EXAMPLE
+ content = scaffold.Master
case "shell":
- content = SHELL_TOML_EXAMPLE
+ content = scaffold.Shell
}
if content == "" {
println("need a valid -config option")
@@ -55,519 +57,7 @@ func runScaffold(cmd *Command, args []string) bool {
if *outputPath != "" {
ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644)
} else {
- println(content)
+ fmt.Println(content)
}
return true
}
-
-const (
- FILER_TOML_EXAMPLE = `
-# A sample TOML config file for SeaweedFS filer store
-# Used with "weed filer" or "weed server -filer"
-# Put this file to one of the location, with descending priority
-# ./filer.toml
-# $HOME/.seaweedfs/filer.toml
-# /etc/seaweedfs/filer.toml
-
-####################################################
-# Customizable filer server options
-####################################################
-[filer.options]
-# with http DELETE, by default the filer would check whether a folder is empty.
-# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
-recursive_delete = false
-# directories under this folder will be automatically creating a separate bucket
-buckets_folder = "/buckets"
-
-####################################################
-# The following are filer store options
-####################################################
-
-[leveldb2]
-# local on disk, mostly for simple single-machine setup, fairly scalable
-# faster than previous leveldb, recommended.
-enabled = true
-dir = "./filerldb2" # directory to store level db files
-
-[leveldb3]
-# similar to leveldb2.
-# each bucket has its own meta store.
-enabled = false
-dir = "./filerldb3" # directory to store level db files
-
-[rocksdb]
-# local on disk, similar to leveldb
-# since it is using a C wrapper, you need to install rocksdb and build it by yourself
-enabled = false
-dir = "./filerrdb" # directory to store rocksdb files
-
-[sqlite]
-# local on disk, similar to leveldb
-enabled = false
-dbFile = "./filer.db" # sqlite db file
-
-[mysql] # or memsql, tidb
-# CREATE TABLE IF NOT EXISTS filemeta (
-# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
-# name VARCHAR(1000) BINARY COMMENT 'directory or file name',
-# directory TEXT COMMENT 'full path to parent directory',
-# meta LONGBLOB,
-# PRIMARY KEY (dirhash, name)
-# ) DEFAULT CHARSET=utf8;
-
-enabled = false
-hostname = "localhost"
-port = 3306
-username = "root"
-password = ""
-database = "" # create or use an existing database
-connection_max_idle = 2
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-interpolateParams = false
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
-
-[mysql2] # or memsql, tidb
-enabled = false
-createTable = """
- CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
- dirhash BIGINT,
- name VARCHAR(1000) BINARY,
- directory TEXT,
- meta LONGBLOB,
- PRIMARY KEY (dirhash, name)
- ) DEFAULT CHARSET=utf8;
-"""
-hostname = "localhost"
-port = 3306
-username = "root"
-password = ""
-database = "" # create or use an existing database
-connection_max_idle = 2
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-interpolateParams = false
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
-
-[postgres] # or cockroachdb, YugabyteDB
-# CREATE TABLE IF NOT EXISTS filemeta (
-# dirhash BIGINT,
-# name VARCHAR(65535),
-# directory VARCHAR(65535),
-# meta bytea,
-# PRIMARY KEY (dirhash, name)
-# );
-enabled = false
-hostname = "localhost"
-port = 5432
-username = "postgres"
-password = ""
-database = "postgres" # create or use an existing database
-schema = ""
-sslmode = "disable"
-connection_max_idle = 100
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
-
-[postgres2]
-enabled = false
-createTable = """
- CREATE TABLE IF NOT EXISTS "%s" (
- dirhash BIGINT,
- name VARCHAR(65535),
- directory VARCHAR(65535),
- meta bytea,
- PRIMARY KEY (dirhash, name)
- );
-"""
-hostname = "localhost"
-port = 5432
-username = "postgres"
-password = ""
-database = "postgres" # create or use an existing database
-schema = ""
-sslmode = "disable"
-connection_max_idle = 100
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
-
-[cassandra]
-# CREATE TABLE filemeta (
-# directory varchar,
-# name varchar,
-# meta blob,
-# PRIMARY KEY (directory, name)
-# ) WITH CLUSTERING ORDER BY (name ASC);
-enabled = false
-keyspace="seaweedfs"
-hosts=[
- "localhost:9042",
-]
-username=""
-password=""
-# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
-superLargeDirectories = []
-
-[hbase]
-enabled = false
-zkquorum = ""
-table = "seaweedfs"
-
-[redis2]
-enabled = false
-address = "localhost:6379"
-password = ""
-database = 0
-# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
-superLargeDirectories = []
-
-[redis_cluster2]
-enabled = false
-addresses = [
- "localhost:30001",
- "localhost:30002",
- "localhost:30003",
- "localhost:30004",
- "localhost:30005",
- "localhost:30006",
-]
-password = ""
-# allows reads from slave servers or the master, but all writes still go to the master
-readOnly = false
-# automatically use the closest Redis server for reads
-routeByLatency = false
-# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
-superLargeDirectories = []
-
-[etcd]
-enabled = false
-servers = "localhost:2379"
-timeout = "3s"
-
-[mongodb]
-enabled = false
-uri = "mongodb://localhost:27017"
-option_pool_size = 0
-database = "seaweedfs"
-
-[elastic7]
-enabled = false
-servers = [
- "http://localhost1:9200",
- "http://localhost2:9200",
- "http://localhost3:9200",
-]
-username = ""
-password = ""
-sniff_enabled = false
-healthcheck_enabled = false
-# increase the value is recommend, be sure the value in Elastic is greater or equal here
-index.max_result_window = 10000
-
-
-
-##########################
-##########################
-# To add path-specific filer store:
-#
-# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
-# 2. Add a location configuraiton. E.g., location = "/tmp/"
-# 3. Copy and customize all other configurations.
-# Make sure they are not the same if using the same store type!
-# 4. Set enabled to true
-#
-# The following is just using redis as an example
-##########################
-[redis2.tmp]
-enabled = false
-location = "/tmp/"
-address = "localhost:6379"
-password = ""
-database = 1
-
-`
-
- NOTIFICATION_TOML_EXAMPLE = `
-# A sample TOML config file for SeaweedFS filer store
-# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate"
-# Put this file to one of the location, with descending priority
-# ./notification.toml
-# $HOME/.seaweedfs/notification.toml
-# /etc/seaweedfs/notification.toml
-
-####################################################
-# notification
-# send and receive filer updates for each file to an external message queue
-####################################################
-[notification.log]
-# this is only for debugging perpose and does not work with "weed filer.replicate"
-enabled = false
-
-
-[notification.kafka]
-enabled = false
-hosts = [
- "localhost:9092"
-]
-topic = "seaweedfs_filer"
-offsetFile = "./last.offset"
-offsetSaveIntervalSeconds = 10
-
-
-[notification.aws_sqs]
-# experimental, let me know if it works
-enabled = false
-aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-region = "us-east-2"
-sqs_queue_name = "my_filer_queue" # an existing queue name
-
-
-[notification.google_pub_sub]
-# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
-enabled = false
-google_application_credentials = "/path/to/x.json" # path to json credential file
-project_id = "" # an existing project id
-topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
-
-[notification.gocdk_pub_sub]
-# The Go Cloud Development Kit (https://gocloud.dev).
-# PubSub API (https://godoc.org/gocloud.dev/pubsub).
-# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
-enabled = false
-# This URL will Dial the RabbitMQ server at the URL in the environment
-# variable RABBIT_SERVER_URL and open the exchange "myexchange".
-# The exchange must have already been created by some other means, like
-# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
-# create binding myexchange => myqueue
-topic_url = "rabbit://myexchange"
-sub_url = "rabbit://myqueue"
-`
-
- REPLICATION_TOML_EXAMPLE = `
-# A sample TOML config file for replicating SeaweedFS filer
-# Used with "weed filer.backup"
-# Using with "weed filer.replicate" is deprecated.
-# Put this file to one of the location, with descending priority
-# ./replication.toml
-# $HOME/.seaweedfs/replication.toml
-# /etc/seaweedfs/replication.toml
-
-[source.filer] # deprecated. Only useful with "weed filer.replicate"
-enabled = true
-grpcAddress = "localhost:18888"
-# all files under this directory tree are replicated.
-# this is not a directory on your hard drive, but on your filer.
-# i.e., all files with this "prefix" are sent to notification message queue.
-directory = "/buckets"
-
-[sink.local]
-enabled = false
-directory = "/data"
-# all replicated files are under modified time as yyyy-mm-dd directories
-# so each date directory contains all new and updated files.
-is_incremental = false
-
-[sink.filer]
-enabled = false
-grpcAddress = "localhost:18888"
-# all replicated files are under this directory tree
-# this is not a directory on your hard drive, but on your filer.
-# i.e., all received files will be "prefixed" to this directory.
-directory = "/backup"
-replication = ""
-collection = ""
-ttlSec = 0
-is_incremental = false
-
-[sink.s3]
-# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
-# default loads credentials from the shared credentials file (~/.aws/credentials).
-enabled = false
-aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-region = "us-east-2"
-bucket = "your_bucket_name" # an existing bucket
-directory = "/" # destination directory
-endpoint = ""
-is_incremental = false
-
-[sink.google_cloud_storage]
-# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
-enabled = false
-google_application_credentials = "/path/to/x.json" # path to json credential file
-bucket = "your_bucket_seaweedfs" # an existing bucket
-directory = "/" # destination directory
-is_incremental = false
-
-[sink.azure]
-# experimental, let me know if it works
-enabled = false
-account_name = ""
-account_key = ""
-container = "mycontainer" # an existing container
-directory = "/" # destination directory
-is_incremental = false
-
-[sink.backblaze]
-enabled = false
-b2_account_id = ""
-b2_master_application_key = ""
-bucket = "mybucket" # an existing bucket
-directory = "/" # destination directory
-is_incremental = false
-
-`
-
- SECURITY_TOML_EXAMPLE = `
-# Put this file to one of the location, with descending priority
-# ./security.toml
-# $HOME/.seaweedfs/security.toml
-# /etc/seaweedfs/security.toml
-# this file is read by master, volume server, and filer
-
-# the jwt signing key is read by master and volume server.
-# a jwt defaults to expire after 10 seconds.
-[jwt.signing]
-key = ""
-expires_after_seconds = 10 # seconds
-
-# jwt for read is only supported with master+volume setup. Filer does not support this mode.
-[jwt.signing.read]
-key = ""
-expires_after_seconds = 10 # seconds
-
-# all grpc tls authentications are mutual
-# the values for the following ca, cert, and key are paths to the PERM files.
-# the host name is not checked, so the PERM files can be shared.
-[grpc]
-ca = ""
-# Set wildcard domain for enable TLS authentication by common names
-allowed_wildcard_domain = "" # .mycompany.com
-
-[grpc.volume]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-[grpc.master]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-[grpc.filer]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-[grpc.msg_broker]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-# use this for any place needs a grpc client
-# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
-[grpc.client]
-cert = ""
-key = ""
-
-# volume server https options
-# Note: work in progress!
-# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
-[https.client]
-enabled = true
-[https.volume]
-cert = ""
-key = ""
-
-
-`
-
- MASTER_TOML_EXAMPLE = `
-# Put this file to one of the location, with descending priority
-# ./master.toml
-# $HOME/.seaweedfs/master.toml
-# /etc/seaweedfs/master.toml
-# this file is read by master
-
-[master.maintenance]
-# periodically run these scripts are the same as running them from 'weed shell'
-scripts = """
- lock
- ec.encode -fullPercent=95 -quietFor=1h
- ec.rebuild -force
- ec.balance -force
- volume.balance -force
- volume.fix.replication
- unlock
-"""
-sleep_minutes = 17 # sleep minutes between each script execution
-
-[master.filer]
-default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
-
-
-[master.sequencer]
-type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence
-# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
-# example : http://127.0.0.1:2379,http://127.0.0.1:2389
-sequencer_etcd_urls = "http://127.0.0.1:2379"
-
-
-# configurations for tiered cloud storage
-# old volumes are transparently moved to cloud for cost efficiency
-[storage.backend]
- [storage.backend.s3.default]
- enabled = false
- aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
- aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
- region = "us-east-2"
- bucket = "your_bucket_name" # an existing bucket
- endpoint = ""
-
-# create this number of logical volumes if no more writable volumes
-# count_x means how many copies of data.
-# e.g.:
-# 000 has only one copy, copy_1
-# 010 and 001 has two copies, copy_2
-# 011 has only 3 copies, copy_3
-[master.volume_growth]
-copy_1 = 7 # create 1 x 7 = 7 actual volumes
-copy_2 = 6 # create 2 x 6 = 12 actual volumes
-copy_3 = 3 # create 3 x 3 = 9 actual volumes
-copy_other = 1 # create n x 1 = n actual volumes
-
-# configuration flags for replication
-[master.replication]
-# any replication counts should be considered minimums. If you specify 010 and
-# have 3 different racks, that's still considered writable. Writes will still
-# try to replicate to all available volumes. You should only use this option
-# if you are doing your own replication or periodic sync of volumes.
-treat_replication_as_minimums = false
-
-`
- SHELL_TOML_EXAMPLE = `
-
-[cluster]
-default = "c1"
-
-[cluster.c1]
-master = "localhost:9333" # comma-separated master servers
-filer = "localhost:8888" # filer host and port
-
-[cluster.c2]
-master = ""
-filer = ""
-
-`
-)
diff --git a/weed/command/scaffold/example.go b/weed/command/scaffold/example.go
new file mode 100644
index 000000000..6be6804e5
--- /dev/null
+++ b/weed/command/scaffold/example.go
@@ -0,0 +1,21 @@
+package scaffold
+
+import _ "embed"
+
+//go:embed filer.toml
+var Filer string
+
+//go:embed notification.toml
+var Notification string
+
+//go:embed replication.toml
+var Replication string
+
+//go:embed security.toml
+var Security string
+
+//go:embed master.toml
+var Master string
+
+//go:embed shell.toml
+var Shell string
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
new file mode 100644
index 000000000..9e9258865
--- /dev/null
+++ b/weed/command/scaffold/filer.toml
@@ -0,0 +1,232 @@
+# A sample TOML config file for SeaweedFS filer store
+# Used with "weed filer" or "weed server -filer"
+# Put this file to one of the location, with descending priority
+# ./filer.toml
+# $HOME/.seaweedfs/filer.toml
+# /etc/seaweedfs/filer.toml
+
+####################################################
+# Customizable filer server options
+####################################################
+[filer.options]
+# with http DELETE, by default the filer would check whether a folder is empty.
+# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
+recursive_delete = false
+
+####################################################
+# The following are filer store options
+####################################################
+
+[leveldb2]
+# local on disk, mostly for simple single-machine setup, fairly scalable
+# faster than previous leveldb, recommended.
+enabled = true
+dir = "./filerldb2" # directory to store level db files
+
+[leveldb3]
+# similar to leveldb2.
+# each bucket has its own meta store.
+enabled = false
+dir = "./filerldb3" # directory to store level db files
+
+[rocksdb]
+# local on disk, similar to leveldb
+# since it is using a C wrapper, you need to install rocksdb and build it by yourself
+enabled = false
+dir = "./filerrdb" # directory to store rocksdb files
+
+[sqlite]
+# local on disk, similar to leveldb
+enabled = false
+dbFile = "./filer.db" # sqlite db file
+
+[mysql] # or memsql, tidb
+# CREATE TABLE IF NOT EXISTS filemeta (
+# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
+# name VARCHAR(1000) BINARY COMMENT 'directory or file name',
+# directory TEXT COMMENT 'full path to parent directory',
+# meta LONGBLOB,
+# PRIMARY KEY (dirhash, name)
+# ) DEFAULT CHARSET=utf8;
+
+enabled = false
+hostname = "localhost"
+port = 3306
+username = "root"
+password = ""
+database = "" # create or use an existing database
+connection_max_idle = 2
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+interpolateParams = false
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
+
+[mysql2] # or memsql, tidb
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
+ dirhash BIGINT,
+ name VARCHAR(1000) BINARY,
+ directory TEXT,
+ meta LONGBLOB,
+ PRIMARY KEY (dirhash, name)
+ ) DEFAULT CHARSET=utf8;
+"""
+hostname = "localhost"
+port = 3306
+username = "root"
+password = ""
+database = "" # create or use an existing database
+connection_max_idle = 2
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+interpolateParams = false
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
+
+[postgres] # or cockroachdb, YugabyteDB
+# CREATE TABLE IF NOT EXISTS filemeta (
+# dirhash BIGINT,
+# name VARCHAR(65535),
+# directory VARCHAR(65535),
+# meta bytea,
+# PRIMARY KEY (dirhash, name)
+# );
+enabled = false
+hostname = "localhost"
+port = 5432
+username = "postgres"
+password = ""
+database = "postgres" # create or use an existing database
+schema = ""
+sslmode = "disable"
+connection_max_idle = 100
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
+
+[postgres2]
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS "%s" (
+ dirhash BIGINT,
+ name VARCHAR(65535),
+ directory VARCHAR(65535),
+ meta bytea,
+ PRIMARY KEY (dirhash, name)
+ );
+"""
+hostname = "localhost"
+port = 5432
+username = "postgres"
+password = ""
+database = "postgres" # create or use an existing database
+schema = ""
+sslmode = "disable"
+connection_max_idle = 100
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
+
+[cassandra]
+# CREATE TABLE filemeta (
+# directory varchar,
+# name varchar,
+# meta blob,
+# PRIMARY KEY (directory, name)
+# ) WITH CLUSTERING ORDER BY (name ASC);
+enabled = false
+keyspace = "seaweedfs"
+hosts = [
+ "localhost:9042",
+]
+username = ""
+password = ""
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+# Name of the datacenter local to this filer, used as host selection fallback.
+localDC = ""
+
+[hbase]
+enabled = false
+zkquorum = ""
+table = "seaweedfs"
+
+[redis2]
+enabled = false
+address = "localhost:6379"
+password = ""
+database = 0
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
+[redis_cluster2]
+enabled = false
+addresses = [
+ "localhost:30001",
+ "localhost:30002",
+ "localhost:30003",
+ "localhost:30004",
+ "localhost:30005",
+ "localhost:30006",
+]
+password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = false
+# automatically use the closest Redis server for reads
+routeByLatency = false
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
+[etcd]
+enabled = false
+servers = "localhost:2379"
+timeout = "3s"
+
+[mongodb]
+enabled = false
+uri = "mongodb://localhost:27017"
+option_pool_size = 0
+database = "seaweedfs"
+
+[elastic7]
+enabled = false
+servers = [
+ "http://localhost1:9200",
+ "http://localhost2:9200",
+ "http://localhost3:9200",
+]
+username = ""
+password = ""
+sniff_enabled = false
+healthcheck_enabled = false
+# increase the value is recommend, be sure the value in Elastic is greater or equal here
+index.max_result_window = 10000
+
+
+
+##########################
+##########################
+# To add path-specific filer store:
+#
+# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
+# 2. Add a location configuraiton. E.g., location = "/tmp/"
+# 3. Copy and customize all other configurations.
+# Make sure they are not the same if using the same store type!
+# 4. Set enabled to true
+#
+# The following is just using redis as an example
+##########################
+[redis2.tmp]
+enabled = false
+location = "/tmp/"
+address = "localhost:6379"
+password = ""
+database = 1
diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml
new file mode 100644
index 000000000..020f48e36
--- /dev/null
+++ b/weed/command/scaffold/master.toml
@@ -0,0 +1,63 @@
+# Put this file to one of the location, with descending priority
+# ./master.toml
+# $HOME/.seaweedfs/master.toml
+# /etc/seaweedfs/master.toml
+# this file is read by master
+
+[master.maintenance]
+# periodically run these scripts are the same as running them from 'weed shell'
+scripts = """
+ lock
+ ec.encode -fullPercent=95 -quietFor=1h
+ ec.rebuild -force
+ ec.balance -force
+ volume.deleteEmpty -quietFor=24h -force
+ volume.balance -force
+ volume.fix.replication
+ unlock
+"""
+sleep_minutes = 17 # sleep minutes between each script execution
+
+[master.filer]
+default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
+
+
+[master.sequencer]
+type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence
+# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
+# example : http://127.0.0.1:2379,http://127.0.0.1:2389
+sequencer_etcd_urls = "http://127.0.0.1:2379"
+# when sequencer.type = snowflake, the snowflake id must be different from other masters
+sequencer_snowflake_id = 0 # any number between 1~1023
+
+
+# configurations for tiered cloud storage
+# old volumes are transparently moved to cloud for cost efficiency
+[storage.backend]
+[storage.backend.s3.default]
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+bucket = "your_bucket_name" # an existing bucket
+endpoint = ""
+
+# create this number of logical volumes if no more writable volumes
+# count_x means how many copies of data.
+# e.g.:
+# 000 has only one copy, copy_1
+# 010 and 001 has two copies, copy_2
+# 011 has only 3 copies, copy_3
+[master.volume_growth]
+copy_1 = 7 # create 1 x 7 = 7 actual volumes
+copy_2 = 6 # create 2 x 6 = 12 actual volumes
+copy_3 = 3 # create 3 x 3 = 9 actual volumes
+copy_other = 1 # create n x 1 = n actual volumes
+
+# configuration flags for replication
+[master.replication]
+# any replication counts should be considered minimums. If you specify 010 and
+# have 3 different racks, that's still considered writable. Writes will still
+# try to replicate to all available volumes. You should only use this option
+# if you are doing your own replication or periodic sync of volumes.
+treat_replication_as_minimums = false
diff --git a/weed/command/scaffold/notification.toml b/weed/command/scaffold/notification.toml
new file mode 100644
index 000000000..f35101edd
--- /dev/null
+++ b/weed/command/scaffold/notification.toml
@@ -0,0 +1,54 @@
+# A sample TOML config file for SeaweedFS filer store
+# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate"
+# Put this file to one of the location, with descending priority
+# ./notification.toml
+# $HOME/.seaweedfs/notification.toml
+# /etc/seaweedfs/notification.toml
+
+####################################################
+# notification
+# send and receive filer updates for each file to an external message queue
+####################################################
+[notification.log]
+# this is only for debugging perpose and does not work with "weed filer.replicate"
+enabled = false
+
+
+[notification.kafka]
+enabled = false
+hosts = [
+ "localhost:9092"
+]
+topic = "seaweedfs_filer"
+offsetFile = "./last.offset"
+offsetSaveIntervalSeconds = 10
+
+
+[notification.aws_sqs]
+# experimental, let me know if it works
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+sqs_queue_name = "my_filer_queue" # an existing queue name
+
+
+[notification.google_pub_sub]
+# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
+enabled = false
+google_application_credentials = "/path/to/x.json" # path to json credential file
+project_id = "" # an existing project id
+topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
+
+[notification.gocdk_pub_sub]
+# The Go Cloud Development Kit (https://gocloud.dev).
+# PubSub API (https://godoc.org/gocloud.dev/pubsub).
+# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
+enabled = false
+# This URL will Dial the RabbitMQ server at the URL in the environment
+# variable RABBIT_SERVER_URL and open the exchange "myexchange".
+# The exchange must have already been created by some other means, like
+# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
+# create binding myexchange => myqueue
+topic_url = "rabbit://myexchange"
+sub_url = "rabbit://myqueue"
diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml
new file mode 100644
index 000000000..c463c8077
--- /dev/null
+++ b/weed/command/scaffold/replication.toml
@@ -0,0 +1,71 @@
+# A sample TOML config file for replicating SeaweedFS filer
+# Used with "weed filer.backup"
+# Using with "weed filer.replicate" is deprecated.
+# Put this file to one of the location, with descending priority
+# ./replication.toml
+# $HOME/.seaweedfs/replication.toml
+# /etc/seaweedfs/replication.toml
+
+[source.filer] # deprecated. Only useful with "weed filer.replicate"
+enabled = true
+grpcAddress = "localhost:18888"
+# all files under this directory tree are replicated.
+# this is not a directory on your hard drive, but on your filer.
+# i.e., all files with this "prefix" are sent to notification message queue.
+directory = "/buckets"
+
+[sink.local]
+enabled = false
+directory = "/data"
+# all replicated files are under modified time as yyyy-mm-dd directories
+# so each date directory contains all new and updated files.
+is_incremental = false
+
+[sink.filer]
+enabled = false
+grpcAddress = "localhost:18888"
+# all replicated files are under this directory tree
+# this is not a directory on your hard drive, but on your filer.
+# i.e., all received files will be "prefixed" to this directory.
+directory = "/backup"
+replication = ""
+collection = ""
+ttlSec = 0
+is_incremental = false
+
+[sink.s3]
+# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
+# default loads credentials from the shared credentials file (~/.aws/credentials).
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+bucket = "your_bucket_name" # an existing bucket
+directory = "/" # destination directory
+endpoint = ""
+is_incremental = false
+
+[sink.google_cloud_storage]
+# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
+enabled = false
+google_application_credentials = "/path/to/x.json" # path to json credential file
+bucket = "your_bucket_seaweedfs" # an existing bucket
+directory = "/" # destination directory
+is_incremental = false
+
+[sink.azure]
+# experimental, let me know if it works
+enabled = false
+account_name = ""
+account_key = ""
+container = "mycontainer" # an existing container
+directory = "/" # destination directory
+is_incremental = false
+
+[sink.backblaze]
+enabled = false
+b2_account_id = ""
+b2_master_application_key = ""
+bucket = "mybucket" # an existing bucket
+directory = "/" # destination directory
+is_incremental = false
diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml
new file mode 100644
index 000000000..0c69b2f24
--- /dev/null
+++ b/weed/command/scaffold/security.toml
@@ -0,0 +1,60 @@
+# Put this file to one of the location, with descending priority
+# ./security.toml
+# $HOME/.seaweedfs/security.toml
+# /etc/seaweedfs/security.toml
+# this file is read by master, volume server, and filer
+
+# the jwt signing key is read by master and volume server.
+# a jwt defaults to expire after 10 seconds.
+[jwt.signing]
+key = ""
+expires_after_seconds = 10 # seconds
+
+# jwt for read is only supported with master+volume setup. Filer does not support this mode.
+[jwt.signing.read]
+key = ""
+expires_after_seconds = 10 # seconds
+
+# all grpc tls authentications are mutual
+# the values for the following ca, cert, and key are paths to the PERM files.
+# the host name is not checked, so the PERM files can be shared.
+[grpc]
+ca = ""
+# Set wildcard domain for enable TLS authentication by common names
+allowed_wildcard_domain = "" # .mycompany.com
+
+[grpc.volume]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.master]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.filer]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.msg_broker]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+# use this for any place needs a grpc client
+# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
+[grpc.client]
+cert = ""
+key = ""
+
+# volume server https options
+# Note: work in progress!
+# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
+[https.client]
+enabled = true
+[https.volume]
+cert = ""
+key = ""
+
diff --git a/weed/command/scaffold/shell.toml b/weed/command/scaffold/shell.toml
new file mode 100644
index 000000000..288ae2efe
--- /dev/null
+++ b/weed/command/scaffold/shell.toml
@@ -0,0 +1,10 @@
+[cluster]
+default = "c1"
+
+[cluster.c1]
+master = "localhost:9333" # comma-separated master servers
+filer = "localhost:8888" # filer host and port
+
+[cluster.c2]
+master = ""
+filer = ""
diff --git a/weed/command/server.go b/weed/command/server.go
index d0020d33b..fe10b24f7 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -3,6 +3,7 @@ package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/util/grace"
+ "net/http"
"os"
"strings"
"time"
@@ -16,6 +17,8 @@ import (
type ServerOptions struct {
cpuprofile *string
memprofile *string
+ debug *bool
+ debugPort *int
v VolumeServerOptions
}
@@ -78,6 +81,8 @@ var (
func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file")
+ serverOptions.debug = cmdServer.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:6060/debug/pprof/goroutine?debug=2")
+ serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging")
masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@@ -107,10 +112,11 @@ func init() {
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
- serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
+ serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
+ serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
@@ -139,6 +145,10 @@ func init() {
func runServer(cmd *Command, args []string) bool {
+ if *serverOptions.debug {
+ go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil)
+ }
+
util.LoadConfiguration("security", false)
util.LoadConfiguration("master", false)
@@ -245,7 +255,7 @@ func runServer(cmd *Command, args []string) bool {
// start volume server
if *isStartingVolumeServer {
- minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent)
+ minFreeSpaces := util.MustParseMinFreeSpace(*volumeMinFreeSpace, *volumeMinFreeSpacePercent)
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, minFreeSpaces)
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index c9976e809..4a9f4b027 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -55,6 +55,7 @@ func runShell(command *Command, args []string) bool {
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
+ shellOptions.FilerAddress = *shellInitialFiler
if err != nil {
fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
return false
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 0f9361b40..9ae1befab 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -71,20 +71,20 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master)
+ defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master)
if err != nil {
fmt.Printf("upload: %v", err)
return false
}
if *upload.replication == "" {
- *upload.replication = defaultCollection
+ *upload.replication = defaultReplication
}
if len(args) == 0 {
if *upload.dir == "" {
return false
}
- filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error {
+ err = filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error {
if err == nil {
if !info.IsDir() {
if *upload.include != "" {
@@ -108,12 +108,21 @@ func runUpload(cmd *Command, args []string) bool {
}
return err
})
+ if err != nil {
+ fmt.Println(err.Error())
+ return false
+ }
} else {
parts, e := operation.NewFileParts(args)
if e != nil {
fmt.Println(e.Error())
+ return false
+ }
+ results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ if err != nil {
+ fmt.Println(err.Error())
+ return false
}
- results, _ := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 139a3791e..235eff11b 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -35,31 +35,32 @@ var (
)
type VolumeServerOptions struct {
- port *int
- publicPort *int
- folders []string
- folderMaxLimits []int
- idxFolder *string
- ip *string
- publicUrl *string
- bindIp *string
- masters *string
- idleConnectionTimeout *int
- dataCenter *string
- rack *string
- whiteList []string
- indexType *string
- diskType *string
- fixJpgOrientation *bool
- readRedirect *bool
- cpuProfile *string
- memProfile *string
- compactionMBPerSecond *int
- fileSizeLimitMB *int
- concurrentUploadLimitMB *int
- pprof *bool
- preStopSeconds *int
- metricsHttpPort *int
+ port *int
+ publicPort *int
+ folders []string
+ folderMaxLimits []int
+ idxFolder *string
+ ip *string
+ publicUrl *string
+ bindIp *string
+ masters *string
+ idleConnectionTimeout *int
+ dataCenter *string
+ rack *string
+ whiteList []string
+ indexType *string
+ diskType *string
+ fixJpgOrientation *bool
+ readMode *string
+ cpuProfile *string
+ memProfile *string
+ compactionMBPerSecond *int
+ fileSizeLimitMB *int
+ concurrentUploadLimitMB *int
+ concurrentDownloadLimitMB *int
+ pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
enableTcp *bool
}
@@ -80,12 +81,13 @@ func init() {
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
- v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
+ v.readMode = cmdVolume.Flag.String("readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
- v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size")
+ v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
@@ -228,10 +230,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList,
- *v.fixJpgOrientation, *v.readRedirect,
+ *v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024,
+ int64(*v.concurrentDownloadLimitMB)*1024*1024,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -259,6 +262,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// Stop heartbeats
if !volumeServer.StopHeartbeat() {
+ volumeServer.SetStopping()
glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
}