aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-08-16 00:54:51 +0800
committerGitHub <noreply@github.com>2021-08-16 00:54:51 +0800
commit27c05f8c0b5c7bda43babeb61d79684d11851111 (patch)
treed235573112ce168ca904acbc3932ed12e94de80c /weed/command
parent97ad3e9d027216d74132652d4d899c7fc7c33ab1 (diff)
parentec989b037717f8fd7f0ed3bbc80f0a33654fe7aa (diff)
downloadseaweedfs-27c05f8c0b5c7bda43babeb61d79684d11851111.tar.xz
seaweedfs-27c05f8c0b5c7bda43babeb61d79684d11851111.zip
Merge pull request #80 from chrislusf/master
sync
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/autocomplete.go109
-rw-r--r--weed/command/backup.go2
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/command.go9
-rw-r--r--weed/command/download.go21
-rw-r--r--weed/command/filer.go7
-rw-r--r--weed/command/filer_backup.go55
-rw-r--r--weed/command/filer_copy.go37
-rw-r--r--weed/command/filer_meta_backup.go49
-rw-r--r--weed/command/filer_meta_tail.go37
-rw-r--r--weed/command/filer_remote_sync.go257
-rw-r--r--weed/command/filer_replication.go6
-rw-r--r--weed/command/filer_sync.go54
-rw-r--r--weed/command/fuse.go86
-rw-r--r--weed/command/gateway.go93
-rw-r--r--weed/command/iam.go1
-rw-r--r--weed/command/imports.go30
-rw-r--r--weed/command/master.go4
-rw-r--r--weed/command/master_follower.go143
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/command/scaffold.go528
-rw-r--r--weed/command/scaffold/example.go21
-rw-r--r--weed/command/scaffold/filer.toml232
-rw-r--r--weed/command/scaffold/master.toml63
-rw-r--r--weed/command/scaffold/notification.toml54
-rw-r--r--weed/command/scaffold/replication.toml71
-rw-r--r--weed/command/scaffold/security.toml60
-rw-r--r--weed/command/scaffold/shell.toml10
-rw-r--r--weed/command/server.go14
-rw-r--r--weed/command/shell.go1
-rw-r--r--weed/command/upload.go17
-rw-r--r--weed/command/volume.go60
32 files changed, 1267 insertions, 867 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go
new file mode 100644
index 000000000..9a545a183
--- /dev/null
+++ b/weed/command/autocomplete.go
@@ -0,0 +1,109 @@
+package command
+
+import (
+ "fmt"
+ flag "github.com/chrislusf/seaweedfs/weed/util/fla9"
+ "github.com/posener/complete"
+ completeinstall "github.com/posener/complete/cmd/install"
+ "runtime"
+)
+
+func AutocompleteMain(commands []*Command) bool {
+ subCommands := make(map[string]complete.Command)
+ helpSubCommands := make(map[string]complete.Command)
+ for _, cmd := range commands {
+ flags := make(map[string]complete.Predictor)
+ cmd.Flag.VisitAll(func(flag *flag.Flag) {
+ flags["-"+flag.Name] = complete.PredictAnything
+ })
+
+ subCommands[cmd.Name()] = complete.Command{
+ Flags: flags,
+ }
+ helpSubCommands[cmd.Name()] = complete.Command{}
+ }
+ subCommands["help"] = complete.Command{Sub: helpSubCommands}
+
+ globalFlags := make(map[string]complete.Predictor)
+ flag.VisitAll(func(flag *flag.Flag) {
+ globalFlags["-"+flag.Name] = complete.PredictAnything
+ })
+
+ weedCmd := complete.Command{
+ Sub: subCommands,
+ Flags: globalFlags,
+ GlobalFlags: complete.Flags{"-h": complete.PredictNothing},
+ }
+ cmp := complete.New("weed", weedCmd)
+
+ return cmp.Complete()
+}
+
+func installAutoCompletion() bool {
+ if runtime.GOOS == "windows" {
+ fmt.Println("windows is not supported")
+ return false
+ }
+
+ err := completeinstall.Install("weed")
+ if err != nil {
+ fmt.Printf("install failed! %s\n", err)
+ return false
+ }
+ fmt.Printf("autocompletion is enabled. Please restart your shell.\n")
+ return true
+}
+
+func uninstallAutoCompletion() bool {
+ if runtime.GOOS == "windows" {
+ fmt.Println("windows is not supported")
+ return false
+ }
+
+ err := completeinstall.Uninstall("weed")
+ if err != nil {
+ fmt.Printf("uninstall failed! %s\n", err)
+ return false
+ }
+ fmt.Printf("autocompletion is disable. Please restart your shell.\n")
+ return true
+}
+
+var cmdAutocomplete = &Command{
+ Run: runAutocomplete,
+ UsageLine: "autocomplete",
+ Short: "install autocomplete",
+ Long: `weed autocomplete is installed in the shell.
+
+ Supported shells are bash, zsh, and fish.
+ Windows is not supported.
+
+`,
+}
+
+func runAutocomplete(cmd *Command, args []string) bool {
+ if len(args) != 0 {
+ cmd.Usage()
+ }
+
+ return installAutoCompletion()
+}
+
+var cmdUnautocomplete = &Command{
+ Run: runUnautocomplete,
+ UsageLine: "autocomplete.uninstall",
+ Short: "uninstall autocomplete",
+ Long: `weed autocomplete is uninstalled in the shell.
+
+ Windows is not supported.
+
+`,
+}
+
+func runUnautocomplete(cmd *Command, args []string) bool {
+ if len(args) != 0 {
+ cmd.Usage()
+ }
+
+ return uninstallAutoCompletion()
+}
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 207df770b..4c5a2d820 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
- lookup, err := operation.Lookup(func() string { return *s.master }, vid.String())
+ lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 4fedb55f1..f0c8f6139 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -212,7 +212,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
}
var jwtAuthorization security.EncodedJwt
if isSecure {
- jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
+ jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid)
}
if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++
diff --git a/weed/command/command.go b/weed/command/command.go
index 18e53ad8c..8d6525652 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -8,23 +8,26 @@ import (
)
var Commands = []*Command{
- cmdBenchmark,
+ cmdAutocomplete,
+ cmdUnautocomplete,
cmdBackup,
+ cmdBenchmark,
cmdCompact,
- cmdCopy,
cmdDownload,
cmdExport,
cmdFiler,
cmdFilerBackup,
cmdFilerCat,
+ cmdFilerCopy,
cmdFilerMetaBackup,
cmdFilerMetaTail,
+ cmdFilerRemoteSynchronize,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
cmdFuse,
- cmdGateway,
cmdMaster,
+ cmdMasterFollower,
cmdMount,
cmdS3,
cmdIam,
diff --git a/weed/command/download.go b/weed/command/download.go
index 7bbff9448..a64d3f237 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -2,6 +2,8 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "google.golang.org/grpc"
"io"
"io/ioutil"
"net/http"
@@ -43,20 +45,23 @@ var cmdDownload = &Command{
}
func runDownload(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
for _, fid := range args {
- if e := downloadToFile(func() string { return *d.server }, fid, util.ResolvePath(*d.dir)); e != nil {
+ if e := downloadToFile(func() string { return *d.server }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}
return true
}
-func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) error {
- fileUrl, lookupError := operation.LookupFileId(masterFn, fileId)
+func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, fileId, saveDir string) error {
+ fileUrl, jwt, lookupError := operation.LookupFileId(masterFn, grpcDialOption, fileId)
if lookupError != nil {
return lookupError
}
- filename, _, rc, err := util.DownloadFile(fileUrl)
+ filename, _, rc, err := util.DownloadFile(fileUrl, jwt)
if err != nil {
return err
}
@@ -83,7 +88,7 @@ func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) erro
fids := strings.Split(string(content), "\n")
for _, partId := range fids {
var n int
- _, part, err := fetchContent(masterFn, partId)
+ _, part, err := fetchContent(masterFn, grpcDialOption, partId)
if err == nil {
n, err = f.Write(part)
}
@@ -103,13 +108,13 @@ func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) erro
return nil
}
-func fetchContent(masterFn operation.GetMasterFn, fileId string) (filename string, content []byte, e error) {
- fileUrl, lookupError := operation.LookupFileId(masterFn, fileId)
+func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, fileId string) (filename string, content []byte, e error) {
+ fileUrl, jwt, lookupError := operation.LookupFileId(masterFn, grpcDialOption, fileId)
if lookupError != nil {
return "", nil, lookupError
}
var rc *http.Response
- if filename, _, rc, e = util.DownloadFile(fileUrl); e != nil {
+ if filename, _, rc, e = util.DownloadFile(fileUrl, jwt); e != nil {
return "", nil, e
}
defer util.CloseResponse(rc)
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a723b4d8a..ddee0852c 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -50,6 +50,8 @@ type FilerOptions struct {
saveToFilerLimit *int
defaultLevelDbDirectory *string
concurrentUploadLimitMB *int
+ debug *bool
+ debugPort *int
}
func init() {
@@ -73,6 +75,8 @@ func init() {
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
+ f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -122,6 +126,9 @@ var cmdFiler = &Command{
}
func runFiler(cmd *Command, args []string) bool {
+ if *f.debug {
+ go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil)
+ }
util.LoadConfiguration("security", false)
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 888b46fe7..0c450181b 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,16 +1,13 @@
package command
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
"time"
)
@@ -52,11 +49,11 @@ var cmdFilerBackup = &Command{
func runFilerBackup(cmd *Command, args []string) bool {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
for {
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
if err != nil {
@@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "backup_" + dataSink.GetName(),
- PathPrefix: sourcePath,
- SinceNs: startFrom.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
-
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("processEventFn: %v", err)
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
- return fmt.Errorf("setOffset: %v", err)
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
+ sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index a5d29c451..722f64679 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -52,21 +52,21 @@ type CopyOptions struct {
}
func init() {
- cmdCopy.Run = runCopy // break init cycle
- cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
- copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
- copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
- copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
- copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
- copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
- copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
- copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
- copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
- copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying")
+ cmdFilerCopy.Run = runCopy // break init cycle
+ cmdFilerCopy.IsDebug = cmdFilerCopy.Flag.Bool("debug", false, "verbose debug information")
+ copy.include = cmdFilerCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
+ copy.replication = cmdFilerCopy.Flag.String("replication", "", "replication type")
+ copy.collection = cmdFilerCopy.Flag.String("collection", "", "optional collection name")
+ copy.ttl = cmdFilerCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+ copy.diskType = cmdFilerCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
+ copy.maxMB = cmdFilerCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
+ copy.concurrenctFiles = cmdFilerCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctChunks = cmdFilerCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
+ copy.checkSize = cmdFilerCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
+ copy.verbose = cmdFilerCopy.Flag.Bool("verbose", false, "print out details during copying")
}
-var cmdCopy = &Command{
+var cmdFilerCopy = &Command{
UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
Short: "copy one or a list of files to a filer folder",
Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
@@ -154,7 +154,7 @@ func runCopy(cmd *Command, args []string) bool {
}
copy.ttlSec = int32(ttl.Minutes()) * 60
- if *cmdCopy.IsDebug {
+ if *cmdFilerCopy.IsDebug {
grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
@@ -213,11 +213,15 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
mode := fi.Mode()
uid, gid := util.GetFileUidGid(fi)
+ fileSize := fi.Size()
+ if mode.IsDir() {
+ fileSize = 0
+ }
fileCopyTaskChan <- FileCopyTask{
sourceLocation: fileOrDir,
destinationUrlPath: destPath,
- fileSize: fi.Size(),
+ fileSize: fileSize,
fileMode: fi.Mode(),
uid: uid,
gid: gid,
@@ -377,6 +381,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if assignResult.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
}
+ if assignResult.Url == "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult)
+ }
return nil
})
})
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index ba0b44659..6fe323fba 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
- "io"
"reflect"
"time"
@@ -53,6 +52,7 @@ The backup writes to another filer store specified in a backup_filer.toml.
func runFilerMetaBackup(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
// load backup_filer.toml
@@ -189,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
- tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "meta_backup",
- PathPrefix: *metaBackup.filerDirectory,
- SinceNs: startTime.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
- return err2
- }
- }
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
+ return metaBackup.setOffset(lastTime)
+ })
- }
+ return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
+ *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
- })
- return tailErr
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 8451ffd78..28c0db99b 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -3,16 +3,15 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
- "io"
"os"
"path/filepath"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -45,6 +44,7 @@ var (
func runFilerMetaTail(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var filterFunc func(dir, fname string) bool
@@ -103,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *tailTarget,
- SinceNs: time.Now().Add(-*tailStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
+ *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
+ func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
- continue
+ return nil
}
- if err = eachEntryFunc(resp); err != nil {
+ if err := eachEntryFunc(resp); err != nil {
return err
}
- }
+ return nil
+ }, false)
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
new file mode 100644
index 000000000..8b20957e4
--- /dev/null
+++ b/weed/command/filer_remote_sync.go
@@ -0,0 +1,257 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+ "time"
+)
+
+type RemoteSyncOptions struct {
+ filerAddress *string
+ grpcDialOption grpc.DialOption
+ readChunkFromFiler *bool
+ debug *bool
+ timeAgo *time.Duration
+ dir *string
+}
+
+const (
+ RemoteSyncKeyPrefix = "remote.sync."
+)
+
+var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
+
+func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+ return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+}
+func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
+var (
+ remoteSyncOptions RemoteSyncOptions
+)
+
+func init() {
+ cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
+ remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
+ remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
+ remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
+ remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
+ remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+}
+
+var cmdFilerRemoteSynchronize = &Command{
+ UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
+ Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
+ Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
+
+ filer.remote.sync listens on filer update events.
+ If any mounted remote file is updated, it will fetch the updated content,
+ and write to the remote storage.
+`,
+}
+
+func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ remoteSyncOptions.grpcDialOption = grpcDialOption
+
+ // read filer remote storage mount mappings
+ mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
+ if readErr != nil {
+ fmt.Printf("read mount mapping: %v", readErr)
+ return false
+ }
+
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(
+ *remoteSyncOptions.filerAddress,
+ pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
+ "/", // does not matter
+ *remoteSyncOptions.readChunkFromFiler,
+ )
+
+ var found bool
+ for dir, remoteStorageMountLocation := range mappings.Mappings {
+ if *remoteSyncOptions.dir == dir {
+ found = true
+ storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
+ if readErr != nil {
+ fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
+ continue
+ }
+ fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
+ if err := util.Retry("filer.remote.sync "+dir, func() error {
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
+ }); err != nil {
+ fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
+ }
+ break
+ }
+ }
+ if !found {
+ fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
+ return false
+ }
+
+ return true
+}
+
+func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
+
+ dirHash := util.HashStringToLong(mountedDir)
+
+ // 1. specified by timeAgo
+ // 2. last offset timestamp for this directory
+ // 3. directory creation time
+ var lastOffsetTs time.Time
+ if *option.timeAgo == 0 {
+ mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ if err != nil {
+ return fmt.Errorf("lookup %s: %v", mountedDir, err)
+ }
+
+ lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
+ } else {
+ lastOffsetTs = time.Now().Add(-*option.timeAgo)
+ }
+
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return err
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if len(message.NewEntry.Chunks) == 0 {
+ return nil
+ }
+ fmt.Printf("create: %+v\n", resp)
+ if !shouldSendToRemote(message.NewEntry) {
+ fmt.Printf("skipping creating: %+v\n", resp)
+ return nil
+ }
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if message.NewEntry.IsDirectory {
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ fmt.Printf("delete: %+v\n", resp)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ return client.DeleteFile(dest)
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if !shouldSendToRemote(message.NewEntry) {
+ fmt.Printf("skipping updating: %+v\n", resp)
+ return nil
+ }
+ if message.NewEntry.IsDirectory {
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
+ fmt.Printf("update meta: %+v\n", resp)
+ return client.UpdateFileMetadata(dest, message.NewEntry)
+ }
+ }
+ fmt.Printf("update: %+v\n", resp)
+ if err := client.DeleteFile(oldDest); err != nil {
+ return err
+ }
+ reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+
+ return nil
+ }
+
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
+ return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
+ })
+
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
+ "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+}
+
+func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
+ source := string(sourcePath[len(mountDir):])
+ dest := util.FullPath(remoteMountLocation.Path).Child(source)
+ return &filer_pb.RemoteStorageLocation{
+ Name: remoteMountLocation.Name,
+ Bucket: remoteMountLocation.Bucket,
+ Path: string(dest),
+ }
+}
+
+func isSameChunks(a, b []*filer_pb.FileChunk) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i := 0; i < len(a); i++ {
+ x, y := a[i], b[i]
+ if !proto.Equal(x, y) {
+ return false
+ }
+ }
+ return true
+}
+
+func shouldSendToRemote(entry *filer_pb.Entry) bool {
+ if entry.RemoteEntry == nil {
+ return true
+ }
+ if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
+ return true
+ }
+ return false
+}
+
+func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
+ entry.RemoteEntry = remoteEntry
+ return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ return err
+ })
+}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 885c95540..bf0a3e140 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -7,12 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/replication"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
- _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 52fc0b477..5440811dd 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
- "io"
"strings"
"time"
)
@@ -71,8 +70,8 @@ func init() {
var cmdFilerSynchronize = &Command{
UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
- Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
- Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
+ Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
+ Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
and write to the other destination. Different from filer.replicate:
@@ -89,6 +88,7 @@ var cmdFilerSynchronize = &Command{
func runFilerSynchronize(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
@@ -165,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "syncTo_" + targetFiler,
- PathPrefix: sourcePath,
- SinceNs: sourceFilerOffsetTsNs,
- Signature: targetFilerSignature,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
- return err
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
+ sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+
}
const (
diff --git a/weed/command/fuse.go b/weed/command/fuse.go
index 74cf2bb70..a0dcaa86c 100644
--- a/weed/command/fuse.go
+++ b/weed/command/fuse.go
@@ -2,10 +2,10 @@ package command
import (
"fmt"
- "strings"
+ "os"
"strconv"
+ "strings"
"time"
- "os"
)
func init() {
@@ -13,7 +13,7 @@ func init() {
}
type parameter struct {
- name string
+ name string
value string
}
@@ -22,6 +22,8 @@ func runFuse(cmd *Command, args []string) bool {
rawArgsLen := len(rawArgs)
option := strings.Builder{}
options := []parameter{}
+ masterProcess := true
+ fusermountPath := ""
// first parameter
i := 0
@@ -40,7 +42,7 @@ func runFuse(cmd *Command, args []string) bool {
option.Reset()
}
- // dash separator read option until next space
+ // dash separator read option until next space
} else if rawArgs[i] == '-' {
for i++; i < rawArgsLen && rawArgs[i] != ' '; i++ {
option.WriteByte(rawArgs[i])
@@ -48,25 +50,25 @@ func runFuse(cmd *Command, args []string) bool {
options = append(options, parameter{option.String(), "true"})
option.Reset()
- // equal separator start option with pending value
+ // equal separator start option with pending value
} else if rawArgs[i] == '=' {
name := option.String()
option.Reset()
- for i++; i < rawArgsLen && rawArgs[i] != ','; i++ {
+ for i++; i < rawArgsLen && rawArgs[i] != ',' && rawArgs[i] != ' '; i++ {
// double quote separator read option until next double quote
if rawArgs[i] == '"' {
for i++; i < rawArgsLen && rawArgs[i] != '"'; i++ {
option.WriteByte(rawArgs[i])
}
- // single quote separator read option until next single quote
+ // single quote separator read option until next single quote
} else if rawArgs[i] == '\'' {
for i++; i < rawArgsLen && rawArgs[i] != '\''; i++ {
option.WriteByte(rawArgs[i])
}
- // add chars before comma
+ // add chars before comma
} else if rawArgs[i] != ' ' {
option.WriteByte(rawArgs[i])
}
@@ -75,12 +77,12 @@ func runFuse(cmd *Command, args []string) bool {
options = append(options, parameter{name, option.String()})
option.Reset()
- // comma separator just read current option
+ // comma separator just read current option
} else if rawArgs[i] == ',' {
options = append(options, parameter{option.String(), "true"})
option.Reset()
- // what is not a separator fill option buffer
+ // what is not a separator fill option buffer
} else {
option.WriteByte(rawArgs[i])
}
@@ -97,7 +99,9 @@ func runFuse(cmd *Command, args []string) bool {
for i := 0; i < len(options); i++ {
parameter := options[i]
- switch parameter.name {
+ switch parameter.name {
+ case "child":
+ masterProcess = false
case "arg0":
mountOptions.dir = &parameter.value
case "filer":
@@ -105,7 +109,7 @@ func runFuse(cmd *Command, args []string) bool {
case "filer.path":
mountOptions.filerMountRootPath = &parameter.value
case "dirAutoCreate":
- if parsed, err := strconv.ParseBool(parameter.value); err != nil {
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
mountOptions.dirAutoCreate = &parsed
} else {
panic(fmt.Errorf("dirAutoCreate: %s", err))
@@ -117,14 +121,14 @@ func runFuse(cmd *Command, args []string) bool {
case "disk":
mountOptions.diskType = &parameter.value
case "ttl":
- if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err != nil {
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
intValue := int(parsed)
mountOptions.ttlSec = &intValue
} else {
panic(fmt.Errorf("ttl: %s", err))
}
case "chunkSizeLimitMB":
- if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err != nil {
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
intValue := int(parsed)
mountOptions.chunkSizeLimitMB = &intValue
} else {
@@ -132,7 +136,7 @@ func runFuse(cmd *Command, args []string) bool {
}
case "concurrentWriters":
i++
- if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err != nil {
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
intValue := int(parsed)
mountOptions.concurrentWriters = &intValue
} else {
@@ -141,7 +145,7 @@ func runFuse(cmd *Command, args []string) bool {
case "cacheDir":
mountOptions.cacheDir = &parameter.value
case "cacheCapacityMB":
- if parsed, err := strconv.ParseInt(parameter.value, 0, 64); err != nil {
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 64); err == nil {
mountOptions.cacheSizeMB = &parsed
} else {
panic(fmt.Errorf("cacheCapacityMB: %s", err))
@@ -149,7 +153,7 @@ func runFuse(cmd *Command, args []string) bool {
case "dataCenter":
mountOptions.dataCenter = &parameter.value
case "allowOthers":
- if parsed, err := strconv.ParseBool(parameter.value); err != nil {
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
mountOptions.allowOthers = &parsed
} else {
panic(fmt.Errorf("allowOthers: %s", err))
@@ -157,7 +161,7 @@ func runFuse(cmd *Command, args []string) bool {
case "umask":
mountOptions.umaskString = &parameter.value
case "nonempty":
- if parsed, err := strconv.ParseBool(parameter.value); err != nil {
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
mountOptions.nonempty = &parsed
} else {
panic(fmt.Errorf("nonempty: %s", err))
@@ -169,7 +173,7 @@ func runFuse(cmd *Command, args []string) bool {
case "map.gid":
mountOptions.gidMap = &parameter.value
case "readOnly":
- if parsed, err := strconv.ParseBool(parameter.value); err != nil {
+ if parsed, err := strconv.ParseBool(parameter.value); err == nil {
mountOptions.readOnly = &parsed
} else {
panic(fmt.Errorf("readOnly: %s", err))
@@ -179,17 +183,51 @@ func runFuse(cmd *Command, args []string) bool {
case "memprofile":
mountMemProfile = &parameter.value
case "readRetryTime":
- if parsed, err := time.ParseDuration(parameter.value); err != nil {
+ if parsed, err := time.ParseDuration(parameter.value); err == nil {
mountReadRetryTime = &parsed
} else {
panic(fmt.Errorf("readRetryTime: %s", err))
}
+ case "fusermount.path":
+ fusermountPath = parameter.value
+ }
+ }
+
+ // the master start the child, release it then finish himself
+ if masterProcess {
+ arg0, err := os.Executable()
+ if err != nil {
+ panic(err)
+ }
+
+ argv := append(os.Args, "-o", "child")
+
+ attr := os.ProcAttr{}
+ attr.Env = os.Environ()
+
+ child, err := os.StartProcess(arg0, argv, &attr)
+
+ if err != nil {
+ panic(fmt.Errorf("master process can not start child process: %s", err))
}
+
+ err = child.Release()
+
+ if err != nil {
+ panic(fmt.Errorf("master process can not release child process: %s", err))
+ }
+
+ return true
}
- // I don't know why PATH environment variable is lost
- if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin"); err != nil {
- panic(fmt.Errorf("setenv: %s", err))
+ if fusermountPath != "" {
+ if err := os.Setenv("PATH", fusermountPath); err != nil {
+ panic(fmt.Errorf("setenv: %s", err))
+ }
+ } else if os.Getenv("PATH") == "" {
+ if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin"); err != nil {
+ panic(fmt.Errorf("setenv: %s", err))
+ }
}
// just call "weed mount" command
@@ -198,7 +236,7 @@ func runFuse(cmd *Command, args []string) bool {
var cmdFuse = &Command{
UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"",
- Short: "Allow use weed with linux's mount command",
+ Short: "Allow use weed with linux's mount command",
Long: `Allow use weed with linux's mount command
You can use -t weed on mount command:
diff --git a/weed/command/gateway.go b/weed/command/gateway.go
deleted file mode 100644
index 8a6f852a5..000000000
--- a/weed/command/gateway.go
+++ /dev/null
@@ -1,93 +0,0 @@
-package command
-
-import (
- "net/http"
- "strconv"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- gatewayOptions GatewayOptions
-)
-
-type GatewayOptions struct {
- masters *string
- filers *string
- bindIp *string
- port *int
- maxMB *int
-}
-
-func init() {
- cmdGateway.Run = runGateway // break init cycle
- gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers")
- gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers")
- gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to")
- gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port")
- gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit")
-}
-
-var cmdGateway = &Command{
- UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*",
- Short: "start a gateway server that points to a list of master servers or a list of filers",
- Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages.
-
- POST /blobs/
- upload the blob and return a chunk id
- DELETE /blobs/<chunk_id>
- delete a chunk id
-
- /*
- POST /files/path/to/a/file
- save /path/to/a/file on filer
- DELETE /files/path/to/a/file
- delete /path/to/a/file on filer
-
- POST /topics/topicName
- save on filer to /topics/topicName/<ds>/ts.json
- */
-`,
-}
-
-func runGateway(cmd *Command, args []string) bool {
-
- util.LoadConfiguration("security", false)
-
- gatewayOptions.startGateway()
-
- return true
-}
-
-func (gw *GatewayOptions) startGateway() {
-
- defaultMux := http.NewServeMux()
-
- _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{
- Masters: strings.Split(*gw.masters, ","),
- Filers: strings.Split(*gw.filers, ","),
- MaxMB: *gw.maxMB,
- })
- if gws_err != nil {
- glog.Fatalf("Gateway startup error: %v", gws_err)
- }
-
- glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port)
- gatewayListener, e := util.NewListener(
- *gw.bindIp+":"+strconv.Itoa(*gw.port),
- time.Duration(10)*time.Second,
- )
- if e != nil {
- glog.Fatalf("Filer listener error: %v", e)
- }
-
- httpS := &http.Server{Handler: defaultMux}
- if err := httpS.Serve(gatewayListener); err != nil {
- glog.Fatalf("Gateway Fail to serve: %v", e)
- }
-
-}
diff --git a/weed/command/iam.go b/weed/command/iam.go
index 17d0832cb..ed4eea543 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -49,6 +49,7 @@ func (iamopt *IamOptions) startIamServer() bool {
return false
}
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/command/imports.go b/weed/command/imports.go
new file mode 100644
index 000000000..ce0bf0e10
--- /dev/null
+++ b/weed/command/imports.go
@@ -0,0 +1,30 @@
+package command
+
+import (
+ _ "net/http/pprof"
+
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
+
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink"
+ _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+)
diff --git a/weed/command/master.go b/weed/command/master.go
index 0f5e2156d..4eb43ee09 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -54,7 +54,7 @@ func init() {
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
// m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
+ m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.")
m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
@@ -198,7 +198,7 @@ func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOp
Host: *m.ip,
Port: *m.port,
MetaFolder: *m.metaFolder,
- VolumeSizeLimitMB: *m.volumeSizeLimitMB,
+ VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
VolumePreallocate: *m.volumePreallocate,
// PulseSeconds: *m.pulseSeconds,
DefaultReplicaPlacement: *m.defaultReplication,
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
new file mode 100644
index 000000000..b628f7abf
--- /dev/null
+++ b/weed/command/master_follower.go
@@ -0,0 +1,143 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc/reflection"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+)
+
+var (
+ mf MasterOptions
+)
+
+func init() {
+ cmdMasterFollower.Run = runMasterFollower // break init cycle
+ mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port")
+ mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to")
+ mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
+
+ mf.ip = aws.String(util.DetectedHostAddress())
+ mf.metaFolder = aws.String("")
+ mf.volumeSizeLimitMB = nil
+ mf.volumePreallocate = nil
+ mf.defaultReplication = nil
+ mf.garbageThreshold = aws.Float64(0.1)
+ mf.whiteList = nil
+ mf.disableHttp = aws.Bool(false)
+ mf.metricsAddress = aws.String("")
+ mf.metricsIntervalSec = aws.Int(0)
+ mf.raftResumeState = aws.Bool(false)
+}
+
+var cmdMasterFollower = &Command{
+ UsageLine: "master.follower -port=9333 -masters=<master1Host>:<master1Port>",
+ Short: "start a master follower",
+ Long: `start a master follower to provide volume=>location mapping service
+
+ The master follower does not participate in master election.
+ It just follow the existing masters, and listen for any volume location changes.
+
+ In most cases, the master follower is not needed. In big data centers with thousands of volume
+ servers. In theory, the master may have trouble to keep up with the write requests and read requests.
+
+ The master follower can relieve the master from from read requests, which only needs to
+ lookup a fileId or volumeId.
+
+ The master follower currently can handle fileId lookup requests:
+ /dir/lookup?volumeId=4
+ /dir/lookup?fileId=4,49c50924569199
+ And gRPC API
+ rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {}
+
+ This master follower is stateless and can run from any place.
+
+ `,
+}
+
+func runMasterFollower(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("master", false)
+
+ startMasterFollower(mf)
+
+ return true
+}
+
+func startMasterFollower(masterOptions MasterOptions) {
+
+ // collect settings from main masters
+ masters := strings.Split(*mf.peers, ",")
+ masterGrpcAddresses, err := pb.ParseServersToGrpcAddresses(masters)
+ if err != nil {
+ glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
+ return
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
+ for i := 0; i < 10; i++ {
+ err = pb.WithOneOfGrpcMasterClients(masterGrpcAddresses, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master grpc address %v configuration: %v", masterGrpcAddresses, err)
+ }
+ masterOptions.defaultReplication = &resp.DefaultReplication
+ masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB))
+ masterOptions.volumePreallocate = &resp.VolumePreallocate
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("failed to talk to filer %v: %v", masterGrpcAddresses, err)
+ glog.V(0).Infof("wait for %d seconds ...", i+1)
+ time.Sleep(time.Duration(i+1) * time.Second)
+ }
+ }
+ if err != nil {
+ glog.Errorf("failed to talk to filer %v: %v", masterGrpcAddresses, err)
+ return
+ }
+
+ option := masterOptions.toMasterOption(nil)
+ option.IsFollower = true
+
+ r := mux.NewRouter()
+ ms := weed_server.NewMasterServer(r, option, masters)
+ listeningAddress := *masterOptions.ipBind + ":" + strconv.Itoa(*masterOptions.port)
+ glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
+ masterListener, e := util.NewListener(listeningAddress, 0)
+ if e != nil {
+ glog.Fatalf("Master startup error: %v", e)
+ }
+
+ // starting grpc server
+ grpcPort := *masterOptions.port + 10000
+ grpcL, err := util.NewListener(*masterOptions.ipBind+":"+strconv.Itoa(grpcPort), 0)
+ if err != nil {
+ glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
+ master_pb.RegisterSeaweedServer(grpcS, ms)
+ reflection.Register(grpcS)
+ glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort)
+ go grpcS.Serve(grpcL)
+
+ go ms.MasterClient.KeepConnectedToMaster()
+
+ // start http server
+ httpS := &http.Server{Handler: r}
+ go httpS.Serve(masterListener)
+
+ select {}
+}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index dce2197d6..cdf340067 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -246,6 +246,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
server := fs.New(c, nil)
seaweedFileSystem.Server = server
+ seaweedFileSystem.StartBackgroundTasks()
err = server.Serve(seaweedFileSystem)
// check if the mount process has an error to report
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 2d6729bd3..886c0ac5e 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -1,6 +1,8 @@
package command
import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/command/scaffold"
"io/ioutil"
"path/filepath"
)
@@ -35,17 +37,17 @@ func runScaffold(cmd *Command, args []string) bool {
content := ""
switch *config {
case "filer":
- content = FILER_TOML_EXAMPLE
+ content = scaffold.Filer
case "notification":
- content = NOTIFICATION_TOML_EXAMPLE
+ content = scaffold.Notification
case "replication":
- content = REPLICATION_TOML_EXAMPLE
+ content = scaffold.Replication
case "security":
- content = SECURITY_TOML_EXAMPLE
+ content = scaffold.Security
case "master":
- content = MASTER_TOML_EXAMPLE
+ content = scaffold.Master
case "shell":
- content = SHELL_TOML_EXAMPLE
+ content = scaffold.Shell
}
if content == "" {
println("need a valid -config option")
@@ -55,519 +57,7 @@ func runScaffold(cmd *Command, args []string) bool {
if *outputPath != "" {
ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644)
} else {
- println(content)
+ fmt.Println(content)
}
return true
}
-
-const (
- FILER_TOML_EXAMPLE = `
-# A sample TOML config file for SeaweedFS filer store
-# Used with "weed filer" or "weed server -filer"
-# Put this file to one of the location, with descending priority
-# ./filer.toml
-# $HOME/.seaweedfs/filer.toml
-# /etc/seaweedfs/filer.toml
-
-####################################################
-# Customizable filer server options
-####################################################
-[filer.options]
-# with http DELETE, by default the filer would check whether a folder is empty.
-# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
-recursive_delete = false
-# directories under this folder will be automatically creating a separate bucket
-buckets_folder = "/buckets"
-
-####################################################
-# The following are filer store options
-####################################################
-
-[leveldb2]
-# local on disk, mostly for simple single-machine setup, fairly scalable
-# faster than previous leveldb, recommended.
-enabled = true
-dir = "./filerldb2" # directory to store level db files
-
-[leveldb3]
-# similar to leveldb2.
-# each bucket has its own meta store.
-enabled = false
-dir = "./filerldb3" # directory to store level db files
-
-[rocksdb]
-# local on disk, similar to leveldb
-# since it is using a C wrapper, you need to install rocksdb and build it by yourself
-enabled = false
-dir = "./filerrdb" # directory to store rocksdb files
-
-[sqlite]
-# local on disk, similar to leveldb
-enabled = false
-dbFile = "./filer.db" # sqlite db file
-
-[mysql] # or memsql, tidb
-# CREATE TABLE IF NOT EXISTS filemeta (
-# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
-# name VARCHAR(1000) BINARY COMMENT 'directory or file name',
-# directory TEXT COMMENT 'full path to parent directory',
-# meta LONGBLOB,
-# PRIMARY KEY (dirhash, name)
-# ) DEFAULT CHARSET=utf8;
-
-enabled = false
-hostname = "localhost"
-port = 3306
-username = "root"
-password = ""
-database = "" # create or use an existing database
-connection_max_idle = 2
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-interpolateParams = false
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
-
-[mysql2] # or memsql, tidb
-enabled = false
-createTable = """
- CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
- dirhash BIGINT,
- name VARCHAR(1000) BINARY,
- directory TEXT,
- meta LONGBLOB,
- PRIMARY KEY (dirhash, name)
- ) DEFAULT CHARSET=utf8;
-"""
-hostname = "localhost"
-port = 3306
-username = "root"
-password = ""
-database = "" # create or use an existing database
-connection_max_idle = 2
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-interpolateParams = false
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
-
-[postgres] # or cockroachdb, YugabyteDB
-# CREATE TABLE IF NOT EXISTS filemeta (
-# dirhash BIGINT,
-# name VARCHAR(65535),
-# directory VARCHAR(65535),
-# meta bytea,
-# PRIMARY KEY (dirhash, name)
-# );
-enabled = false
-hostname = "localhost"
-port = 5432
-username = "postgres"
-password = ""
-database = "postgres" # create or use an existing database
-schema = ""
-sslmode = "disable"
-connection_max_idle = 100
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
-
-[postgres2]
-enabled = false
-createTable = """
- CREATE TABLE IF NOT EXISTS "%s" (
- dirhash BIGINT,
- name VARCHAR(65535),
- directory VARCHAR(65535),
- meta bytea,
- PRIMARY KEY (dirhash, name)
- );
-"""
-hostname = "localhost"
-port = 5432
-username = "postgres"
-password = ""
-database = "postgres" # create or use an existing database
-schema = ""
-sslmode = "disable"
-connection_max_idle = 100
-connection_max_open = 100
-connection_max_lifetime_seconds = 0
-# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
-enableUpsert = true
-upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
-
-[cassandra]
-# CREATE TABLE filemeta (
-# directory varchar,
-# name varchar,
-# meta blob,
-# PRIMARY KEY (directory, name)
-# ) WITH CLUSTERING ORDER BY (name ASC);
-enabled = false
-keyspace="seaweedfs"
-hosts=[
- "localhost:9042",
-]
-username=""
-password=""
-# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
-superLargeDirectories = []
-
-[hbase]
-enabled = false
-zkquorum = ""
-table = "seaweedfs"
-
-[redis2]
-enabled = false
-address = "localhost:6379"
-password = ""
-database = 0
-# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
-superLargeDirectories = []
-
-[redis_cluster2]
-enabled = false
-addresses = [
- "localhost:30001",
- "localhost:30002",
- "localhost:30003",
- "localhost:30004",
- "localhost:30005",
- "localhost:30006",
-]
-password = ""
-# allows reads from slave servers or the master, but all writes still go to the master
-readOnly = false
-# automatically use the closest Redis server for reads
-routeByLatency = false
-# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
-superLargeDirectories = []
-
-[etcd]
-enabled = false
-servers = "localhost:2379"
-timeout = "3s"
-
-[mongodb]
-enabled = false
-uri = "mongodb://localhost:27017"
-option_pool_size = 0
-database = "seaweedfs"
-
-[elastic7]
-enabled = false
-servers = [
- "http://localhost1:9200",
- "http://localhost2:9200",
- "http://localhost3:9200",
-]
-username = ""
-password = ""
-sniff_enabled = false
-healthcheck_enabled = false
-# increase the value is recommend, be sure the value in Elastic is greater or equal here
-index.max_result_window = 10000
-
-
-
-##########################
-##########################
-# To add path-specific filer store:
-#
-# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
-# 2. Add a location configuraiton. E.g., location = "/tmp/"
-# 3. Copy and customize all other configurations.
-# Make sure they are not the same if using the same store type!
-# 4. Set enabled to true
-#
-# The following is just using redis as an example
-##########################
-[redis2.tmp]
-enabled = false
-location = "/tmp/"
-address = "localhost:6379"
-password = ""
-database = 1
-
-`
-
- NOTIFICATION_TOML_EXAMPLE = `
-# A sample TOML config file for SeaweedFS filer store
-# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate"
-# Put this file to one of the location, with descending priority
-# ./notification.toml
-# $HOME/.seaweedfs/notification.toml
-# /etc/seaweedfs/notification.toml
-
-####################################################
-# notification
-# send and receive filer updates for each file to an external message queue
-####################################################
-[notification.log]
-# this is only for debugging perpose and does not work with "weed filer.replicate"
-enabled = false
-
-
-[notification.kafka]
-enabled = false
-hosts = [
- "localhost:9092"
-]
-topic = "seaweedfs_filer"
-offsetFile = "./last.offset"
-offsetSaveIntervalSeconds = 10
-
-
-[notification.aws_sqs]
-# experimental, let me know if it works
-enabled = false
-aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-region = "us-east-2"
-sqs_queue_name = "my_filer_queue" # an existing queue name
-
-
-[notification.google_pub_sub]
-# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
-enabled = false
-google_application_credentials = "/path/to/x.json" # path to json credential file
-project_id = "" # an existing project id
-topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
-
-[notification.gocdk_pub_sub]
-# The Go Cloud Development Kit (https://gocloud.dev).
-# PubSub API (https://godoc.org/gocloud.dev/pubsub).
-# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
-enabled = false
-# This URL will Dial the RabbitMQ server at the URL in the environment
-# variable RABBIT_SERVER_URL and open the exchange "myexchange".
-# The exchange must have already been created by some other means, like
-# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
-# create binding myexchange => myqueue
-topic_url = "rabbit://myexchange"
-sub_url = "rabbit://myqueue"
-`
-
- REPLICATION_TOML_EXAMPLE = `
-# A sample TOML config file for replicating SeaweedFS filer
-# Used with "weed filer.backup"
-# Using with "weed filer.replicate" is deprecated.
-# Put this file to one of the location, with descending priority
-# ./replication.toml
-# $HOME/.seaweedfs/replication.toml
-# /etc/seaweedfs/replication.toml
-
-[source.filer] # deprecated. Only useful with "weed filer.replicate"
-enabled = true
-grpcAddress = "localhost:18888"
-# all files under this directory tree are replicated.
-# this is not a directory on your hard drive, but on your filer.
-# i.e., all files with this "prefix" are sent to notification message queue.
-directory = "/buckets"
-
-[sink.local]
-enabled = false
-directory = "/data"
-# all replicated files are under modified time as yyyy-mm-dd directories
-# so each date directory contains all new and updated files.
-is_incremental = false
-
-[sink.filer]
-enabled = false
-grpcAddress = "localhost:18888"
-# all replicated files are under this directory tree
-# this is not a directory on your hard drive, but on your filer.
-# i.e., all received files will be "prefixed" to this directory.
-directory = "/backup"
-replication = ""
-collection = ""
-ttlSec = 0
-is_incremental = false
-
-[sink.s3]
-# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
-# default loads credentials from the shared credentials file (~/.aws/credentials).
-enabled = false
-aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-region = "us-east-2"
-bucket = "your_bucket_name" # an existing bucket
-directory = "/" # destination directory
-endpoint = ""
-is_incremental = false
-
-[sink.google_cloud_storage]
-# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
-enabled = false
-google_application_credentials = "/path/to/x.json" # path to json credential file
-bucket = "your_bucket_seaweedfs" # an existing bucket
-directory = "/" # destination directory
-is_incremental = false
-
-[sink.azure]
-# experimental, let me know if it works
-enabled = false
-account_name = ""
-account_key = ""
-container = "mycontainer" # an existing container
-directory = "/" # destination directory
-is_incremental = false
-
-[sink.backblaze]
-enabled = false
-b2_account_id = ""
-b2_master_application_key = ""
-bucket = "mybucket" # an existing bucket
-directory = "/" # destination directory
-is_incremental = false
-
-`
-
- SECURITY_TOML_EXAMPLE = `
-# Put this file to one of the location, with descending priority
-# ./security.toml
-# $HOME/.seaweedfs/security.toml
-# /etc/seaweedfs/security.toml
-# this file is read by master, volume server, and filer
-
-# the jwt signing key is read by master and volume server.
-# a jwt defaults to expire after 10 seconds.
-[jwt.signing]
-key = ""
-expires_after_seconds = 10 # seconds
-
-# jwt for read is only supported with master+volume setup. Filer does not support this mode.
-[jwt.signing.read]
-key = ""
-expires_after_seconds = 10 # seconds
-
-# all grpc tls authentications are mutual
-# the values for the following ca, cert, and key are paths to the PERM files.
-# the host name is not checked, so the PERM files can be shared.
-[grpc]
-ca = ""
-# Set wildcard domain for enable TLS authentication by common names
-allowed_wildcard_domain = "" # .mycompany.com
-
-[grpc.volume]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-[grpc.master]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-[grpc.filer]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-[grpc.msg_broker]
-cert = ""
-key = ""
-allowed_commonNames = "" # comma-separated SSL certificate common names
-
-# use this for any place needs a grpc client
-# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
-[grpc.client]
-cert = ""
-key = ""
-
-# volume server https options
-# Note: work in progress!
-# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
-[https.client]
-enabled = true
-[https.volume]
-cert = ""
-key = ""
-
-
-`
-
- MASTER_TOML_EXAMPLE = `
-# Put this file to one of the location, with descending priority
-# ./master.toml
-# $HOME/.seaweedfs/master.toml
-# /etc/seaweedfs/master.toml
-# this file is read by master
-
-[master.maintenance]
-# periodically run these scripts are the same as running them from 'weed shell'
-scripts = """
- lock
- ec.encode -fullPercent=95 -quietFor=1h
- ec.rebuild -force
- ec.balance -force
- volume.balance -force
- volume.fix.replication
- unlock
-"""
-sleep_minutes = 17 # sleep minutes between each script execution
-
-[master.filer]
-default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
-
-
-[master.sequencer]
-type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence
-# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
-# example : http://127.0.0.1:2379,http://127.0.0.1:2389
-sequencer_etcd_urls = "http://127.0.0.1:2379"
-
-
-# configurations for tiered cloud storage
-# old volumes are transparently moved to cloud for cost efficiency
-[storage.backend]
- [storage.backend.s3.default]
- enabled = false
- aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
- aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
- region = "us-east-2"
- bucket = "your_bucket_name" # an existing bucket
- endpoint = ""
-
-# create this number of logical volumes if no more writable volumes
-# count_x means how many copies of data.
-# e.g.:
-# 000 has only one copy, copy_1
-# 010 and 001 has two copies, copy_2
-# 011 has only 3 copies, copy_3
-[master.volume_growth]
-copy_1 = 7 # create 1 x 7 = 7 actual volumes
-copy_2 = 6 # create 2 x 6 = 12 actual volumes
-copy_3 = 3 # create 3 x 3 = 9 actual volumes
-copy_other = 1 # create n x 1 = n actual volumes
-
-# configuration flags for replication
-[master.replication]
-# any replication counts should be considered minimums. If you specify 010 and
-# have 3 different racks, that's still considered writable. Writes will still
-# try to replicate to all available volumes. You should only use this option
-# if you are doing your own replication or periodic sync of volumes.
-treat_replication_as_minimums = false
-
-`
- SHELL_TOML_EXAMPLE = `
-
-[cluster]
-default = "c1"
-
-[cluster.c1]
-master = "localhost:9333" # comma-separated master servers
-filer = "localhost:8888" # filer host and port
-
-[cluster.c2]
-master = ""
-filer = ""
-
-`
-)
diff --git a/weed/command/scaffold/example.go b/weed/command/scaffold/example.go
new file mode 100644
index 000000000..6be6804e5
--- /dev/null
+++ b/weed/command/scaffold/example.go
@@ -0,0 +1,21 @@
+package scaffold
+
+import _ "embed"
+
+//go:embed filer.toml
+var Filer string
+
+//go:embed notification.toml
+var Notification string
+
+//go:embed replication.toml
+var Replication string
+
+//go:embed security.toml
+var Security string
+
+//go:embed master.toml
+var Master string
+
+//go:embed shell.toml
+var Shell string
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
new file mode 100644
index 000000000..9e9258865
--- /dev/null
+++ b/weed/command/scaffold/filer.toml
@@ -0,0 +1,232 @@
+# A sample TOML config file for SeaweedFS filer store
+# Used with "weed filer" or "weed server -filer"
+# Put this file to one of the location, with descending priority
+# ./filer.toml
+# $HOME/.seaweedfs/filer.toml
+# /etc/seaweedfs/filer.toml
+
+####################################################
+# Customizable filer server options
+####################################################
+[filer.options]
+# with http DELETE, by default the filer would check whether a folder is empty.
+# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
+recursive_delete = false
+
+####################################################
+# The following are filer store options
+####################################################
+
+[leveldb2]
+# local on disk, mostly for simple single-machine setup, fairly scalable
+# faster than previous leveldb, recommended.
+enabled = true
+dir = "./filerldb2" # directory to store level db files
+
+[leveldb3]
+# similar to leveldb2.
+# each bucket has its own meta store.
+enabled = false
+dir = "./filerldb3" # directory to store level db files
+
+[rocksdb]
+# local on disk, similar to leveldb
+# since it is using a C wrapper, you need to install rocksdb and build it by yourself
+enabled = false
+dir = "./filerrdb" # directory to store rocksdb files
+
+[sqlite]
+# local on disk, similar to leveldb
+enabled = false
+dbFile = "./filer.db" # sqlite db file
+
+[mysql] # or memsql, tidb
+# CREATE TABLE IF NOT EXISTS filemeta (
+# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
+# name VARCHAR(1000) BINARY COMMENT 'directory or file name',
+# directory TEXT COMMENT 'full path to parent directory',
+# meta LONGBLOB,
+# PRIMARY KEY (dirhash, name)
+# ) DEFAULT CHARSET=utf8;
+
+enabled = false
+hostname = "localhost"
+port = 3306
+username = "root"
+password = ""
+database = "" # create or use an existing database
+connection_max_idle = 2
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+interpolateParams = false
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
+
+[mysql2] # or memsql, tidb
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
+ dirhash BIGINT,
+ name VARCHAR(1000) BINARY,
+ directory TEXT,
+ meta LONGBLOB,
+ PRIMARY KEY (dirhash, name)
+ ) DEFAULT CHARSET=utf8;
+"""
+hostname = "localhost"
+port = 3306
+username = "root"
+password = ""
+database = "" # create or use an existing database
+connection_max_idle = 2
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+interpolateParams = false
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)"""
+
+[postgres] # or cockroachdb, YugabyteDB
+# CREATE TABLE IF NOT EXISTS filemeta (
+# dirhash BIGINT,
+# name VARCHAR(65535),
+# directory VARCHAR(65535),
+# meta bytea,
+# PRIMARY KEY (dirhash, name)
+# );
+enabled = false
+hostname = "localhost"
+port = 5432
+username = "postgres"
+password = ""
+database = "postgres" # create or use an existing database
+schema = ""
+sslmode = "disable"
+connection_max_idle = 100
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
+
+[postgres2]
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS "%s" (
+ dirhash BIGINT,
+ name VARCHAR(65535),
+ directory VARCHAR(65535),
+ meta bytea,
+ PRIMARY KEY (dirhash, name)
+ );
+"""
+hostname = "localhost"
+port = 5432
+username = "postgres"
+password = ""
+database = "postgres" # create or use an existing database
+schema = ""
+sslmode = "disable"
+connection_max_idle = 100
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
+# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax:
+enableUpsert = true
+upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta"""
+
+[cassandra]
+# CREATE TABLE filemeta (
+# directory varchar,
+# name varchar,
+# meta blob,
+# PRIMARY KEY (directory, name)
+# ) WITH CLUSTERING ORDER BY (name ASC);
+enabled = false
+keyspace = "seaweedfs"
+hosts = [
+ "localhost:9042",
+]
+username = ""
+password = ""
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+# Name of the datacenter local to this filer, used as host selection fallback.
+localDC = ""
+
+[hbase]
+enabled = false
+zkquorum = ""
+table = "seaweedfs"
+
+[redis2]
+enabled = false
+address = "localhost:6379"
+password = ""
+database = 0
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
+[redis_cluster2]
+enabled = false
+addresses = [
+ "localhost:30001",
+ "localhost:30002",
+ "localhost:30003",
+ "localhost:30004",
+ "localhost:30005",
+ "localhost:30006",
+]
+password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = false
+# automatically use the closest Redis server for reads
+routeByLatency = false
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
+[etcd]
+enabled = false
+servers = "localhost:2379"
+timeout = "3s"
+
+[mongodb]
+enabled = false
+uri = "mongodb://localhost:27017"
+option_pool_size = 0
+database = "seaweedfs"
+
+[elastic7]
+enabled = false
+servers = [
+ "http://localhost1:9200",
+ "http://localhost2:9200",
+ "http://localhost3:9200",
+]
+username = ""
+password = ""
+sniff_enabled = false
+healthcheck_enabled = false
+# increase the value is recommend, be sure the value in Elastic is greater or equal here
+index.max_result_window = 10000
+
+
+
+##########################
+##########################
+# To add path-specific filer store:
+#
+# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
+# 2. Add a location configuraiton. E.g., location = "/tmp/"
+# 3. Copy and customize all other configurations.
+# Make sure they are not the same if using the same store type!
+# 4. Set enabled to true
+#
+# The following is just using redis as an example
+##########################
+[redis2.tmp]
+enabled = false
+location = "/tmp/"
+address = "localhost:6379"
+password = ""
+database = 1
diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml
new file mode 100644
index 000000000..020f48e36
--- /dev/null
+++ b/weed/command/scaffold/master.toml
@@ -0,0 +1,63 @@
+# Put this file to one of the location, with descending priority
+# ./master.toml
+# $HOME/.seaweedfs/master.toml
+# /etc/seaweedfs/master.toml
+# this file is read by master
+
+[master.maintenance]
+# periodically run these scripts are the same as running them from 'weed shell'
+scripts = """
+ lock
+ ec.encode -fullPercent=95 -quietFor=1h
+ ec.rebuild -force
+ ec.balance -force
+ volume.deleteEmpty -quietFor=24h -force
+ volume.balance -force
+ volume.fix.replication
+ unlock
+"""
+sleep_minutes = 17 # sleep minutes between each script execution
+
+[master.filer]
+default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
+
+
+[master.sequencer]
+type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence
+# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
+# example : http://127.0.0.1:2379,http://127.0.0.1:2389
+sequencer_etcd_urls = "http://127.0.0.1:2379"
+# when sequencer.type = snowflake, the snowflake id must be different from other masters
+sequencer_snowflake_id = 0 # any number between 1~1023
+
+
+# configurations for tiered cloud storage
+# old volumes are transparently moved to cloud for cost efficiency
+[storage.backend]
+[storage.backend.s3.default]
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+bucket = "your_bucket_name" # an existing bucket
+endpoint = ""
+
+# create this number of logical volumes if no more writable volumes
+# count_x means how many copies of data.
+# e.g.:
+# 000 has only one copy, copy_1
+# 010 and 001 has two copies, copy_2
+# 011 has only 3 copies, copy_3
+[master.volume_growth]
+copy_1 = 7 # create 1 x 7 = 7 actual volumes
+copy_2 = 6 # create 2 x 6 = 12 actual volumes
+copy_3 = 3 # create 3 x 3 = 9 actual volumes
+copy_other = 1 # create n x 1 = n actual volumes
+
+# configuration flags for replication
+[master.replication]
+# any replication counts should be considered minimums. If you specify 010 and
+# have 3 different racks, that's still considered writable. Writes will still
+# try to replicate to all available volumes. You should only use this option
+# if you are doing your own replication or periodic sync of volumes.
+treat_replication_as_minimums = false
diff --git a/weed/command/scaffold/notification.toml b/weed/command/scaffold/notification.toml
new file mode 100644
index 000000000..f35101edd
--- /dev/null
+++ b/weed/command/scaffold/notification.toml
@@ -0,0 +1,54 @@
+# A sample TOML config file for SeaweedFS filer store
+# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate"
+# Put this file to one of the location, with descending priority
+# ./notification.toml
+# $HOME/.seaweedfs/notification.toml
+# /etc/seaweedfs/notification.toml
+
+####################################################
+# notification
+# send and receive filer updates for each file to an external message queue
+####################################################
+[notification.log]
+# this is only for debugging perpose and does not work with "weed filer.replicate"
+enabled = false
+
+
+[notification.kafka]
+enabled = false
+hosts = [
+ "localhost:9092"
+]
+topic = "seaweedfs_filer"
+offsetFile = "./last.offset"
+offsetSaveIntervalSeconds = 10
+
+
+[notification.aws_sqs]
+# experimental, let me know if it works
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+sqs_queue_name = "my_filer_queue" # an existing queue name
+
+
+[notification.google_pub_sub]
+# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
+enabled = false
+google_application_credentials = "/path/to/x.json" # path to json credential file
+project_id = "" # an existing project id
+topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists
+
+[notification.gocdk_pub_sub]
+# The Go Cloud Development Kit (https://gocloud.dev).
+# PubSub API (https://godoc.org/gocloud.dev/pubsub).
+# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ.
+enabled = false
+# This URL will Dial the RabbitMQ server at the URL in the environment
+# variable RABBIT_SERVER_URL and open the exchange "myexchange".
+# The exchange must have already been created by some other means, like
+# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
+# create binding myexchange => myqueue
+topic_url = "rabbit://myexchange"
+sub_url = "rabbit://myqueue"
diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml
new file mode 100644
index 000000000..c463c8077
--- /dev/null
+++ b/weed/command/scaffold/replication.toml
@@ -0,0 +1,71 @@
+# A sample TOML config file for replicating SeaweedFS filer
+# Used with "weed filer.backup"
+# Using with "weed filer.replicate" is deprecated.
+# Put this file to one of the location, with descending priority
+# ./replication.toml
+# $HOME/.seaweedfs/replication.toml
+# /etc/seaweedfs/replication.toml
+
+[source.filer] # deprecated. Only useful with "weed filer.replicate"
+enabled = true
+grpcAddress = "localhost:18888"
+# all files under this directory tree are replicated.
+# this is not a directory on your hard drive, but on your filer.
+# i.e., all files with this "prefix" are sent to notification message queue.
+directory = "/buckets"
+
+[sink.local]
+enabled = false
+directory = "/data"
+# all replicated files are under modified time as yyyy-mm-dd directories
+# so each date directory contains all new and updated files.
+is_incremental = false
+
+[sink.filer]
+enabled = false
+grpcAddress = "localhost:18888"
+# all replicated files are under this directory tree
+# this is not a directory on your hard drive, but on your filer.
+# i.e., all received files will be "prefixed" to this directory.
+directory = "/backup"
+replication = ""
+collection = ""
+ttlSec = 0
+is_incremental = false
+
+[sink.s3]
+# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html
+# default loads credentials from the shared credentials file (~/.aws/credentials).
+enabled = false
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+bucket = "your_bucket_name" # an existing bucket
+directory = "/" # destination directory
+endpoint = ""
+is_incremental = false
+
+[sink.google_cloud_storage]
+# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
+enabled = false
+google_application_credentials = "/path/to/x.json" # path to json credential file
+bucket = "your_bucket_seaweedfs" # an existing bucket
+directory = "/" # destination directory
+is_incremental = false
+
+[sink.azure]
+# experimental, let me know if it works
+enabled = false
+account_name = ""
+account_key = ""
+container = "mycontainer" # an existing container
+directory = "/" # destination directory
+is_incremental = false
+
+[sink.backblaze]
+enabled = false
+b2_account_id = ""
+b2_master_application_key = ""
+bucket = "mybucket" # an existing bucket
+directory = "/" # destination directory
+is_incremental = false
diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml
new file mode 100644
index 000000000..0c69b2f24
--- /dev/null
+++ b/weed/command/scaffold/security.toml
@@ -0,0 +1,60 @@
+# Put this file to one of the location, with descending priority
+# ./security.toml
+# $HOME/.seaweedfs/security.toml
+# /etc/seaweedfs/security.toml
+# this file is read by master, volume server, and filer
+
+# the jwt signing key is read by master and volume server.
+# a jwt defaults to expire after 10 seconds.
+[jwt.signing]
+key = ""
+expires_after_seconds = 10 # seconds
+
+# jwt for read is only supported with master+volume setup. Filer does not support this mode.
+[jwt.signing.read]
+key = ""
+expires_after_seconds = 10 # seconds
+
+# all grpc tls authentications are mutual
+# the values for the following ca, cert, and key are paths to the PERM files.
+# the host name is not checked, so the PERM files can be shared.
+[grpc]
+ca = ""
+# Set wildcard domain for enable TLS authentication by common names
+allowed_wildcard_domain = "" # .mycompany.com
+
+[grpc.volume]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.master]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.filer]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.msg_broker]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+# use this for any place needs a grpc client
+# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
+[grpc.client]
+cert = ""
+key = ""
+
+# volume server https options
+# Note: work in progress!
+# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
+[https.client]
+enabled = true
+[https.volume]
+cert = ""
+key = ""
+
diff --git a/weed/command/scaffold/shell.toml b/weed/command/scaffold/shell.toml
new file mode 100644
index 000000000..288ae2efe
--- /dev/null
+++ b/weed/command/scaffold/shell.toml
@@ -0,0 +1,10 @@
+[cluster]
+default = "c1"
+
+[cluster.c1]
+master = "localhost:9333" # comma-separated master servers
+filer = "localhost:8888" # filer host and port
+
+[cluster.c2]
+master = ""
+filer = ""
diff --git a/weed/command/server.go b/weed/command/server.go
index d2bd6466e..c784d90b9 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -3,6 +3,7 @@ package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/util/grace"
+ "net/http"
"os"
"strings"
"time"
@@ -16,6 +17,8 @@ import (
type ServerOptions struct {
cpuprofile *string
memprofile *string
+ debug *bool
+ debugPort *int
v VolumeServerOptions
}
@@ -78,13 +81,15 @@ var (
func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file")
+ serverOptions.debug = cmdServer.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:6060/debug/pprof/goroutine?debug=2")
+ serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging")
masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list")
masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
- masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "000", "Default replication type if not specified.")
+ masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "", "Default replication type if not specified.")
masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
@@ -107,10 +112,11 @@ func init() {
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
- serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
+ serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
+ serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
@@ -139,6 +145,10 @@ func init() {
func runServer(cmd *Command, args []string) bool {
+ if *serverOptions.debug {
+ go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil)
+ }
+
util.LoadConfiguration("security", false)
util.LoadConfiguration("master", false)
diff --git a/weed/command/shell.go b/weed/command/shell.go
index c9976e809..4a9f4b027 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -55,6 +55,7 @@ func runShell(command *Command, args []string) bool {
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
+ shellOptions.FilerAddress = *shellInitialFiler
if err != nil {
fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
return false
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 0f9361b40..9ae1befab 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -71,20 +71,20 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master)
+ defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master)
if err != nil {
fmt.Printf("upload: %v", err)
return false
}
if *upload.replication == "" {
- *upload.replication = defaultCollection
+ *upload.replication = defaultReplication
}
if len(args) == 0 {
if *upload.dir == "" {
return false
}
- filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error {
+ err = filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error {
if err == nil {
if !info.IsDir() {
if *upload.include != "" {
@@ -108,12 +108,21 @@ func runUpload(cmd *Command, args []string) bool {
}
return err
})
+ if err != nil {
+ fmt.Println(err.Error())
+ return false
+ }
} else {
parts, e := operation.NewFileParts(args)
if e != nil {
fmt.Println(e.Error())
+ return false
+ }
+ results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ if err != nil {
+ fmt.Println(err.Error())
+ return false
}
- results, _ := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 139a3791e..235eff11b 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -35,31 +35,32 @@ var (
)
type VolumeServerOptions struct {
- port *int
- publicPort *int
- folders []string
- folderMaxLimits []int
- idxFolder *string
- ip *string
- publicUrl *string
- bindIp *string
- masters *string
- idleConnectionTimeout *int
- dataCenter *string
- rack *string
- whiteList []string
- indexType *string
- diskType *string
- fixJpgOrientation *bool
- readRedirect *bool
- cpuProfile *string
- memProfile *string
- compactionMBPerSecond *int
- fileSizeLimitMB *int
- concurrentUploadLimitMB *int
- pprof *bool
- preStopSeconds *int
- metricsHttpPort *int
+ port *int
+ publicPort *int
+ folders []string
+ folderMaxLimits []int
+ idxFolder *string
+ ip *string
+ publicUrl *string
+ bindIp *string
+ masters *string
+ idleConnectionTimeout *int
+ dataCenter *string
+ rack *string
+ whiteList []string
+ indexType *string
+ diskType *string
+ fixJpgOrientation *bool
+ readMode *string
+ cpuProfile *string
+ memProfile *string
+ compactionMBPerSecond *int
+ fileSizeLimitMB *int
+ concurrentUploadLimitMB *int
+ concurrentDownloadLimitMB *int
+ pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
enableTcp *bool
}
@@ -80,12 +81,13 @@ func init() {
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
- v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
+ v.readMode = cmdVolume.Flag.String("readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
- v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size")
+ v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
@@ -228,10 +230,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList,
- *v.fixJpgOrientation, *v.readRedirect,
+ *v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024,
+ int64(*v.concurrentDownloadLimitMB)*1024*1024,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -259,6 +262,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// Stop heartbeats
if !volumeServer.StopHeartbeat() {
+ volumeServer.SetStopping()
glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
}