diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-08-16 00:54:51 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-08-16 00:54:51 +0800 |
| commit | 27c05f8c0b5c7bda43babeb61d79684d11851111 (patch) | |
| tree | d235573112ce168ca904acbc3932ed12e94de80c /weed/command | |
| parent | 97ad3e9d027216d74132652d4d899c7fc7c33ab1 (diff) | |
| parent | ec989b037717f8fd7f0ed3bbc80f0a33654fe7aa (diff) | |
| download | seaweedfs-27c05f8c0b5c7bda43babeb61d79684d11851111.tar.xz seaweedfs-27c05f8c0b5c7bda43babeb61d79684d11851111.zip | |
Merge pull request #80 from chrislusf/master
sync
Diffstat (limited to 'weed/command')
32 files changed, 1267 insertions, 867 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go new file mode 100644 index 000000000..9a545a183 --- /dev/null +++ b/weed/command/autocomplete.go @@ -0,0 +1,109 @@ +package command + +import ( + "fmt" + flag "github.com/chrislusf/seaweedfs/weed/util/fla9" + "github.com/posener/complete" + completeinstall "github.com/posener/complete/cmd/install" + "runtime" +) + +func AutocompleteMain(commands []*Command) bool { + subCommands := make(map[string]complete.Command) + helpSubCommands := make(map[string]complete.Command) + for _, cmd := range commands { + flags := make(map[string]complete.Predictor) + cmd.Flag.VisitAll(func(flag *flag.Flag) { + flags["-"+flag.Name] = complete.PredictAnything + }) + + subCommands[cmd.Name()] = complete.Command{ + Flags: flags, + } + helpSubCommands[cmd.Name()] = complete.Command{} + } + subCommands["help"] = complete.Command{Sub: helpSubCommands} + + globalFlags := make(map[string]complete.Predictor) + flag.VisitAll(func(flag *flag.Flag) { + globalFlags["-"+flag.Name] = complete.PredictAnything + }) + + weedCmd := complete.Command{ + Sub: subCommands, + Flags: globalFlags, + GlobalFlags: complete.Flags{"-h": complete.PredictNothing}, + } + cmp := complete.New("weed", weedCmd) + + return cmp.Complete() +} + +func installAutoCompletion() bool { + if runtime.GOOS == "windows" { + fmt.Println("windows is not supported") + return false + } + + err := completeinstall.Install("weed") + if err != nil { + fmt.Printf("install failed! %s\n", err) + return false + } + fmt.Printf("autocompletion is enabled. Please restart your shell.\n") + return true +} + +func uninstallAutoCompletion() bool { + if runtime.GOOS == "windows" { + fmt.Println("windows is not supported") + return false + } + + err := completeinstall.Uninstall("weed") + if err != nil { + fmt.Printf("uninstall failed! %s\n", err) + return false + } + fmt.Printf("autocompletion is disable. Please restart your shell.\n") + return true +} + +var cmdAutocomplete = &Command{ + Run: runAutocomplete, + UsageLine: "autocomplete", + Short: "install autocomplete", + Long: `weed autocomplete is installed in the shell. + + Supported shells are bash, zsh, and fish. + Windows is not supported. + +`, +} + +func runAutocomplete(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + return installAutoCompletion() +} + +var cmdUnautocomplete = &Command{ + Run: runUnautocomplete, + UsageLine: "autocomplete.uninstall", + Short: "uninstall autocomplete", + Long: `weed autocomplete is uninstalled in the shell. + + Windows is not supported. + +`, +} + +func runUnautocomplete(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + return uninstallAutoCompletion() +} diff --git a/weed/command/backup.go b/weed/command/backup.go index 207df770b..4c5a2d820 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -72,7 +72,7 @@ func runBackup(cmd *Command, args []string) bool { vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.Lookup(func() string { return *s.master }, vid.String()) + lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String()) if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 4fedb55f1..f0c8f6139 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -212,7 +212,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { } var jwtAuthorization security.EncodedJwt if isSecure { - jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid) + jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid) } if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { s.completed++ diff --git a/weed/command/command.go b/weed/command/command.go index 18e53ad8c..8d6525652 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -8,23 +8,26 @@ import ( ) var Commands = []*Command{ - cmdBenchmark, + cmdAutocomplete, + cmdUnautocomplete, cmdBackup, + cmdBenchmark, cmdCompact, - cmdCopy, cmdDownload, cmdExport, cmdFiler, cmdFilerBackup, cmdFilerCat, + cmdFilerCopy, cmdFilerMetaBackup, cmdFilerMetaTail, + cmdFilerRemoteSynchronize, cmdFilerReplicate, cmdFilerSynchronize, cmdFix, cmdFuse, - cmdGateway, cmdMaster, + cmdMasterFollower, cmdMount, cmdS3, cmdIam, diff --git a/weed/command/download.go b/weed/command/download.go index 7bbff9448..a64d3f237 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -2,6 +2,8 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/security" + "google.golang.org/grpc" "io" "io/ioutil" "net/http" @@ -43,20 +45,23 @@ var cmdDownload = &Command{ } func runDownload(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + for _, fid := range args { - if e := downloadToFile(func() string { return *d.server }, fid, util.ResolvePath(*d.dir)); e != nil { + if e := downloadToFile(func() string { return *d.server }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { fmt.Println("Download Error: ", fid, e) } } return true } -func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) error { - fileUrl, lookupError := operation.LookupFileId(masterFn, fileId) +func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, fileId, saveDir string) error { + fileUrl, jwt, lookupError := operation.LookupFileId(masterFn, grpcDialOption, fileId) if lookupError != nil { return lookupError } - filename, _, rc, err := util.DownloadFile(fileUrl) + filename, _, rc, err := util.DownloadFile(fileUrl, jwt) if err != nil { return err } @@ -83,7 +88,7 @@ func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) erro fids := strings.Split(string(content), "\n") for _, partId := range fids { var n int - _, part, err := fetchContent(masterFn, partId) + _, part, err := fetchContent(masterFn, grpcDialOption, partId) if err == nil { n, err = f.Write(part) } @@ -103,13 +108,13 @@ func downloadToFile(masterFn operation.GetMasterFn, fileId, saveDir string) erro return nil } -func fetchContent(masterFn operation.GetMasterFn, fileId string) (filename string, content []byte, e error) { - fileUrl, lookupError := operation.LookupFileId(masterFn, fileId) +func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, fileId string) (filename string, content []byte, e error) { + fileUrl, jwt, lookupError := operation.LookupFileId(masterFn, grpcDialOption, fileId) if lookupError != nil { return "", nil, lookupError } var rc *http.Response - if filename, _, rc, e = util.DownloadFile(fileUrl); e != nil { + if filename, _, rc, e = util.DownloadFile(fileUrl, jwt); e != nil { return "", nil, e } defer util.CloseResponse(rc) diff --git a/weed/command/filer.go b/weed/command/filer.go index a723b4d8a..ddee0852c 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -50,6 +50,8 @@ type FilerOptions struct { saveToFilerLimit *int defaultLevelDbDirectory *string concurrentUploadLimitMB *int + debug *bool + debugPort *int } func init() { @@ -73,6 +75,8 @@ func init() { f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") + f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -122,6 +126,9 @@ var cmdFiler = &Command{ } func runFiler(cmd *Command, args []string) bool { + if *f.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil) + } util.LoadConfiguration("security", false) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 888b46fe7..0c450181b 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -1,16 +1,13 @@ package command import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" - "io" "time" ) @@ -52,11 +49,11 @@ var cmdFilerBackup = &Command{ func runFilerBackup(cmd *Command, args []string) bool { - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - util.LoadConfiguration("security", false) util.LoadConfiguration("replication", true) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + for { err := doFilerBackup(grpcDialOption, &filerBackupOptions) if err != nil { @@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "backup_" + dataSink.GetName(), - PathPrefix: sourcePath, - SinceNs: startFrom.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return fmt.Errorf("processEventFn: %v", err) - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil { - return fmt.Errorf("setOffset: %v", err) - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), + sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false) + } diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index a5d29c451..722f64679 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -52,21 +52,21 @@ type CopyOptions struct { } func init() { - cmdCopy.Run = runCopy // break init cycle - cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information") - copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") - copy.replication = cmdCopy.Flag.String("replication", "", "replication type") - copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") - copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit") - copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") - copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") - copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file") - copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying") + cmdFilerCopy.Run = runCopy // break init cycle + cmdFilerCopy.IsDebug = cmdFilerCopy.Flag.Bool("debug", false, "verbose debug information") + copy.include = cmdFilerCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") + copy.replication = cmdFilerCopy.Flag.String("replication", "", "replication type") + copy.collection = cmdFilerCopy.Flag.String("collection", "", "optional collection name") + copy.ttl = cmdFilerCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") + copy.diskType = cmdFilerCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") + copy.maxMB = cmdFilerCopy.Flag.Int("maxMB", 4, "split files larger than the limit") + copy.concurrenctFiles = cmdFilerCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctChunks = cmdFilerCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") + copy.checkSize = cmdFilerCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file") + copy.verbose = cmdFilerCopy.Flag.Bool("verbose", false, "print out details during copying") } -var cmdCopy = &Command{ +var cmdFilerCopy = &Command{ UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/", Short: "copy one or a list of files to a filer folder", Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder @@ -154,7 +154,7 @@ func runCopy(cmd *Command, args []string) bool { } copy.ttlSec = int32(ttl.Minutes()) * 60 - if *cmdCopy.IsDebug { + if *cmdFilerCopy.IsDebug { grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } @@ -213,11 +213,15 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi mode := fi.Mode() uid, gid := util.GetFileUidGid(fi) + fileSize := fi.Size() + if mode.IsDir() { + fileSize = 0 + } fileCopyTaskChan <- FileCopyTask{ sourceLocation: fileOrDir, destinationUrlPath: destPath, - fileSize: fi.Size(), + fileSize: fileSize, fileMode: fi.Mode(), uid: uid, gid: gid, @@ -377,6 +381,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if assignResult.Error != "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) } + if assignResult.Url == "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult) + } return nil }) }) diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index ba0b44659..6fe323fba 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" "google.golang.org/grpc" - "io" "reflect" "time" @@ -53,6 +52,7 @@ The backup writes to another filer store specified in a backup_filer.toml. func runFilerMetaBackup(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") // load backup_filer.toml @@ -189,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return nil } - tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "meta_backup", - PathPrefix: *metaBackup.filerDirectory, - SinceNs: startTime.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - if err = eachEntryFunc(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil { - return err2 - } - } + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3)) + return metaBackup.setOffset(lastTime) + }) - } + return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", + *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) - }) - return tailErr } func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 8451ffd78..28c0db99b 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -3,16 +3,15 @@ package command import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/golang/protobuf/jsonpb" jsoniter "github.com/json-iterator/go" "github.com/olivere/elastic/v7" - "io" "os" "path/filepath" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -45,6 +44,7 @@ var ( func runFilerMetaTail(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var filterFunc func(dir, fname string) bool @@ -103,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "tail", - PathPrefix: *tailTarget, - SinceNs: time.Now().Add(-*tailStart).UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", + *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0, + func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { - continue + return nil } - if err = eachEntryFunc(resp); err != nil { + if err := eachEntryFunc(resp); err != nil { return err } - } + return nil + }, false) - }) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go new file mode 100644 index 000000000..8b20957e4 --- /dev/null +++ b/weed/command/filer_remote_sync.go @@ -0,0 +1,257 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "time" +) + +type RemoteSyncOptions struct { + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + debug *bool + timeAgo *time.Duration + dir *string +} + +const ( + RemoteSyncKeyPrefix = "remote.sync." +) + +var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) + +func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) +} +func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +var ( + remoteSyncOptions RemoteSyncOptions +) + +func init() { + cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle + remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") + remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") + remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") + remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") + remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerRemoteSynchronize = &Command{ + UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud", + Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage", + Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage + + filer.remote.sync listens on filer update events. + If any mounted remote file is updated, it will fetch the updated content, + and write to the remote storage. +`, +} + +func runFilerRemoteSynchronize(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + remoteSyncOptions.grpcDialOption = grpcDialOption + + // read filer remote storage mount mappings + mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress) + if readErr != nil { + fmt.Printf("read mount mapping: %v", readErr) + return false + } + + filerSource := &source.FilerSource{} + filerSource.DoInitialize( + *remoteSyncOptions.filerAddress, + pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress), + "/", // does not matter + *remoteSyncOptions.readChunkFromFiler, + ) + + var found bool + for dir, remoteStorageMountLocation := range mappings.Mappings { + if *remoteSyncOptions.dir == dir { + found = true + storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name) + if readErr != nil { + fmt.Printf("read remote storage configuration for %s: %v", dir, readErr) + continue + } + fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir) + if err := util.Retry("filer.remote.sync "+dir, func() error { + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) + }); err != nil { + fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err) + } + break + } + } + if !found { + fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir) + return false + } + + return true +} + +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error { + + dirHash := util.HashStringToLong(mountedDir) + + // 1. specified by timeAgo + // 2. last offset timestamp for this directory + // 3. directory creation time + var lastOffsetTs time.Time + if *option.timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) + if err != nil { + return fmt.Errorf("lookup %s: %v", mountedDir, err) + } + + lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } + } else { + lastOffsetTs = time.Now().Add(-*option.timeAgo) + } + + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if len(message.NewEntry.Chunks) == 0 { + return nil + } + fmt.Printf("create: %+v\n", resp) + if !shouldSendToRemote(message.NewEntry) { + fmt.Printf("skipping creating: %+v\n", resp) + return nil + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if message.OldEntry != nil && message.NewEntry == nil { + fmt.Printf("delete: %+v\n", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if !shouldSendToRemote(message.NewEntry) { + fmt.Printf("skipping updating: %+v\n", resp) + return nil + } + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) { + fmt.Printf("update meta: %+v\n", resp) + return client.UpdateFileMetadata(dest, message.NewEntry) + } + } + fmt.Printf("update: %+v\n", resp) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + + return nil + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) + }) + + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, + "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation { + source := string(sourcePath[len(mountDir):]) + dest := util.FullPath(remoteMountLocation.Path).Child(source) + return &filer_pb.RemoteStorageLocation{ + Name: remoteMountLocation.Name, + Bucket: remoteMountLocation.Bucket, + Path: string(dest), + } +} + +func isSameChunks(a, b []*filer_pb.FileChunk) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + x, y := a[i], b[i] + if !proto.Equal(x, y) { + return false + } + } + return true +} + +func shouldSendToRemote(entry *filer_pb.Entry) bool { + if entry.RemoteEntry == nil { + return true + } + if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime { + return true + } + return false +} + +func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + entry.RemoteEntry = remoteEntry + return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) +} diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 885c95540..bf0a3e140 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -7,12 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/replication/sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" ) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 52fc0b477..5440811dd 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -15,7 +15,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" - "io" "strings" "time" ) @@ -71,8 +70,8 @@ func init() { var cmdFilerSynchronize = &Command{ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>", - Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters", - Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers + Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters", + Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. Different from filer.replicate: @@ -89,6 +88,7 @@ var cmdFilerSynchronize = &Command{ func runFilerSynchronize(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) @@ -165,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return persistEventFn(resp) } - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "syncTo_" + targetFiler, - PathPrefix: sourcePath, - SinceNs: sourceFilerOffsetTsNs, - Signature: targetFilerSignature, - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { - return err - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, + sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + } const ( diff --git a/weed/command/fuse.go b/weed/command/fuse.go index 74cf2bb70..a0dcaa86c 100644 --- a/weed/command/fuse.go +++ b/weed/command/fuse.go @@ -2,10 +2,10 @@ package command import ( "fmt" - "strings" + "os" "strconv" + "strings" "time" - "os" ) func init() { @@ -13,7 +13,7 @@ func init() { } type parameter struct { - name string + name string value string } @@ -22,6 +22,8 @@ func runFuse(cmd *Command, args []string) bool { rawArgsLen := len(rawArgs) option := strings.Builder{} options := []parameter{} + masterProcess := true + fusermountPath := "" // first parameter i := 0 @@ -40,7 +42,7 @@ func runFuse(cmd *Command, args []string) bool { option.Reset() } - // dash separator read option until next space + // dash separator read option until next space } else if rawArgs[i] == '-' { for i++; i < rawArgsLen && rawArgs[i] != ' '; i++ { option.WriteByte(rawArgs[i]) @@ -48,25 +50,25 @@ func runFuse(cmd *Command, args []string) bool { options = append(options, parameter{option.String(), "true"}) option.Reset() - // equal separator start option with pending value + // equal separator start option with pending value } else if rawArgs[i] == '=' { name := option.String() option.Reset() - for i++; i < rawArgsLen && rawArgs[i] != ','; i++ { + for i++; i < rawArgsLen && rawArgs[i] != ',' && rawArgs[i] != ' '; i++ { // double quote separator read option until next double quote if rawArgs[i] == '"' { for i++; i < rawArgsLen && rawArgs[i] != '"'; i++ { option.WriteByte(rawArgs[i]) } - // single quote separator read option until next single quote + // single quote separator read option until next single quote } else if rawArgs[i] == '\'' { for i++; i < rawArgsLen && rawArgs[i] != '\''; i++ { option.WriteByte(rawArgs[i]) } - // add chars before comma + // add chars before comma } else if rawArgs[i] != ' ' { option.WriteByte(rawArgs[i]) } @@ -75,12 +77,12 @@ func runFuse(cmd *Command, args []string) bool { options = append(options, parameter{name, option.String()}) option.Reset() - // comma separator just read current option + // comma separator just read current option } else if rawArgs[i] == ',' { options = append(options, parameter{option.String(), "true"}) option.Reset() - // what is not a separator fill option buffer + // what is not a separator fill option buffer } else { option.WriteByte(rawArgs[i]) } @@ -97,7 +99,9 @@ func runFuse(cmd *Command, args []string) bool { for i := 0; i < len(options); i++ { parameter := options[i] - switch parameter.name { + switch parameter.name { + case "child": + masterProcess = false case "arg0": mountOptions.dir = ¶meter.value case "filer": @@ -105,7 +109,7 @@ func runFuse(cmd *Command, args []string) bool { case "filer.path": mountOptions.filerMountRootPath = ¶meter.value case "dirAutoCreate": - if parsed, err := strconv.ParseBool(parameter.value); err != nil { + if parsed, err := strconv.ParseBool(parameter.value); err == nil { mountOptions.dirAutoCreate = &parsed } else { panic(fmt.Errorf("dirAutoCreate: %s", err)) @@ -117,14 +121,14 @@ func runFuse(cmd *Command, args []string) bool { case "disk": mountOptions.diskType = ¶meter.value case "ttl": - if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err != nil { + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { intValue := int(parsed) mountOptions.ttlSec = &intValue } else { panic(fmt.Errorf("ttl: %s", err)) } case "chunkSizeLimitMB": - if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err != nil { + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { intValue := int(parsed) mountOptions.chunkSizeLimitMB = &intValue } else { @@ -132,7 +136,7 @@ func runFuse(cmd *Command, args []string) bool { } case "concurrentWriters": i++ - if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err != nil { + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { intValue := int(parsed) mountOptions.concurrentWriters = &intValue } else { @@ -141,7 +145,7 @@ func runFuse(cmd *Command, args []string) bool { case "cacheDir": mountOptions.cacheDir = ¶meter.value case "cacheCapacityMB": - if parsed, err := strconv.ParseInt(parameter.value, 0, 64); err != nil { + if parsed, err := strconv.ParseInt(parameter.value, 0, 64); err == nil { mountOptions.cacheSizeMB = &parsed } else { panic(fmt.Errorf("cacheCapacityMB: %s", err)) @@ -149,7 +153,7 @@ func runFuse(cmd *Command, args []string) bool { case "dataCenter": mountOptions.dataCenter = ¶meter.value case "allowOthers": - if parsed, err := strconv.ParseBool(parameter.value); err != nil { + if parsed, err := strconv.ParseBool(parameter.value); err == nil { mountOptions.allowOthers = &parsed } else { panic(fmt.Errorf("allowOthers: %s", err)) @@ -157,7 +161,7 @@ func runFuse(cmd *Command, args []string) bool { case "umask": mountOptions.umaskString = ¶meter.value case "nonempty": - if parsed, err := strconv.ParseBool(parameter.value); err != nil { + if parsed, err := strconv.ParseBool(parameter.value); err == nil { mountOptions.nonempty = &parsed } else { panic(fmt.Errorf("nonempty: %s", err)) @@ -169,7 +173,7 @@ func runFuse(cmd *Command, args []string) bool { case "map.gid": mountOptions.gidMap = ¶meter.value case "readOnly": - if parsed, err := strconv.ParseBool(parameter.value); err != nil { + if parsed, err := strconv.ParseBool(parameter.value); err == nil { mountOptions.readOnly = &parsed } else { panic(fmt.Errorf("readOnly: %s", err)) @@ -179,17 +183,51 @@ func runFuse(cmd *Command, args []string) bool { case "memprofile": mountMemProfile = ¶meter.value case "readRetryTime": - if parsed, err := time.ParseDuration(parameter.value); err != nil { + if parsed, err := time.ParseDuration(parameter.value); err == nil { mountReadRetryTime = &parsed } else { panic(fmt.Errorf("readRetryTime: %s", err)) } + case "fusermount.path": + fusermountPath = parameter.value + } + } + + // the master start the child, release it then finish himself + if masterProcess { + arg0, err := os.Executable() + if err != nil { + panic(err) + } + + argv := append(os.Args, "-o", "child") + + attr := os.ProcAttr{} + attr.Env = os.Environ() + + child, err := os.StartProcess(arg0, argv, &attr) + + if err != nil { + panic(fmt.Errorf("master process can not start child process: %s", err)) } + + err = child.Release() + + if err != nil { + panic(fmt.Errorf("master process can not release child process: %s", err)) + } + + return true } - // I don't know why PATH environment variable is lost - if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin"); err != nil { - panic(fmt.Errorf("setenv: %s", err)) + if fusermountPath != "" { + if err := os.Setenv("PATH", fusermountPath); err != nil { + panic(fmt.Errorf("setenv: %s", err)) + } + } else if os.Getenv("PATH") == "" { + if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin"); err != nil { + panic(fmt.Errorf("setenv: %s", err)) + } } // just call "weed mount" command @@ -198,7 +236,7 @@ func runFuse(cmd *Command, args []string) bool { var cmdFuse = &Command{ UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"", - Short: "Allow use weed with linux's mount command", + Short: "Allow use weed with linux's mount command", Long: `Allow use weed with linux's mount command You can use -t weed on mount command: diff --git a/weed/command/gateway.go b/weed/command/gateway.go deleted file mode 100644 index 8a6f852a5..000000000 --- a/weed/command/gateway.go +++ /dev/null @@ -1,93 +0,0 @@ -package command - -import ( - "net/http" - "strconv" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - gatewayOptions GatewayOptions -) - -type GatewayOptions struct { - masters *string - filers *string - bindIp *string - port *int - maxMB *int -} - -func init() { - cmdGateway.Run = runGateway // break init cycle - gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers") - gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers") - gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to") - gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port") - gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit") -} - -var cmdGateway = &Command{ - UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*", - Short: "start a gateway server that points to a list of master servers or a list of filers", - Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages. - - POST /blobs/ - upload the blob and return a chunk id - DELETE /blobs/<chunk_id> - delete a chunk id - - /* - POST /files/path/to/a/file - save /path/to/a/file on filer - DELETE /files/path/to/a/file - delete /path/to/a/file on filer - - POST /topics/topicName - save on filer to /topics/topicName/<ds>/ts.json - */ -`, -} - -func runGateway(cmd *Command, args []string) bool { - - util.LoadConfiguration("security", false) - - gatewayOptions.startGateway() - - return true -} - -func (gw *GatewayOptions) startGateway() { - - defaultMux := http.NewServeMux() - - _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{ - Masters: strings.Split(*gw.masters, ","), - Filers: strings.Split(*gw.filers, ","), - MaxMB: *gw.maxMB, - }) - if gws_err != nil { - glog.Fatalf("Gateway startup error: %v", gws_err) - } - - glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port) - gatewayListener, e := util.NewListener( - *gw.bindIp+":"+strconv.Itoa(*gw.port), - time.Duration(10)*time.Second, - ) - if e != nil { - glog.Fatalf("Filer listener error: %v", e) - } - - httpS := &http.Server{Handler: defaultMux} - if err := httpS.Serve(gatewayListener); err != nil { - glog.Fatalf("Gateway Fail to serve: %v", e) - } - -} diff --git a/weed/command/iam.go b/weed/command/iam.go index 17d0832cb..ed4eea543 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -49,6 +49,7 @@ func (iamopt *IamOptions) startIamServer() bool { return false } + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/command/imports.go b/weed/command/imports.go new file mode 100644 index 000000000..ce0bf0e10 --- /dev/null +++ b/weed/command/imports.go @@ -0,0 +1,30 @@ +package command + +import ( + _ "net/http/pprof" + + _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" + + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" + + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" +) diff --git a/weed/command/master.go b/weed/command/master.go index 0f5e2156d..4eb43ee09 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -54,7 +54,7 @@ func init() { m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") // m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") + m.defaultReplication = cmdMaster.Flag.String("defaultReplication", "", "Default replication type if not specified.") m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") @@ -198,7 +198,7 @@ func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOp Host: *m.ip, Port: *m.port, MetaFolder: *m.metaFolder, - VolumeSizeLimitMB: *m.volumeSizeLimitMB, + VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), VolumePreallocate: *m.volumePreallocate, // PulseSeconds: *m.pulseSeconds, DefaultReplicaPlacement: *m.defaultReplication, diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go new file mode 100644 index 000000000..b628f7abf --- /dev/null +++ b/weed/command/master_follower.go @@ -0,0 +1,143 @@ +package command + +import ( + "context" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gorilla/mux" + "google.golang.org/grpc/reflection" + "net/http" + "strconv" + "strings" + "time" +) + +var ( + mf MasterOptions +) + +func init() { + cmdMasterFollower.Run = runMasterFollower // break init cycle + mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port") + mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to") + mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") + + mf.ip = aws.String(util.DetectedHostAddress()) + mf.metaFolder = aws.String("") + mf.volumeSizeLimitMB = nil + mf.volumePreallocate = nil + mf.defaultReplication = nil + mf.garbageThreshold = aws.Float64(0.1) + mf.whiteList = nil + mf.disableHttp = aws.Bool(false) + mf.metricsAddress = aws.String("") + mf.metricsIntervalSec = aws.Int(0) + mf.raftResumeState = aws.Bool(false) +} + +var cmdMasterFollower = &Command{ + UsageLine: "master.follower -port=9333 -masters=<master1Host>:<master1Port>", + Short: "start a master follower", + Long: `start a master follower to provide volume=>location mapping service + + The master follower does not participate in master election. + It just follow the existing masters, and listen for any volume location changes. + + In most cases, the master follower is not needed. In big data centers with thousands of volume + servers. In theory, the master may have trouble to keep up with the write requests and read requests. + + The master follower can relieve the master from from read requests, which only needs to + lookup a fileId or volumeId. + + The master follower currently can handle fileId lookup requests: + /dir/lookup?volumeId=4 + /dir/lookup?fileId=4,49c50924569199 + And gRPC API + rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {} + + This master follower is stateless and can run from any place. + + `, +} + +func runMasterFollower(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + util.LoadConfiguration("master", false) + + startMasterFollower(mf) + + return true +} + +func startMasterFollower(masterOptions MasterOptions) { + + // collect settings from main masters + masters := strings.Split(*mf.peers, ",") + masterGrpcAddresses, err := pb.ParseServersToGrpcAddresses(masters) + if err != nil { + glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) + return + } + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") + for i := 0; i < 10; i++ { + err = pb.WithOneOfGrpcMasterClients(masterGrpcAddresses, grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master grpc address %v configuration: %v", masterGrpcAddresses, err) + } + masterOptions.defaultReplication = &resp.DefaultReplication + masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB)) + masterOptions.volumePreallocate = &resp.VolumePreallocate + return nil + }) + if err != nil { + glog.V(0).Infof("failed to talk to filer %v: %v", masterGrpcAddresses, err) + glog.V(0).Infof("wait for %d seconds ...", i+1) + time.Sleep(time.Duration(i+1) * time.Second) + } + } + if err != nil { + glog.Errorf("failed to talk to filer %v: %v", masterGrpcAddresses, err) + return + } + + option := masterOptions.toMasterOption(nil) + option.IsFollower = true + + r := mux.NewRouter() + ms := weed_server.NewMasterServer(r, option, masters) + listeningAddress := *masterOptions.ipBind + ":" + strconv.Itoa(*masterOptions.port) + glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) + masterListener, e := util.NewListener(listeningAddress, 0) + if e != nil { + glog.Fatalf("Master startup error: %v", e) + } + + // starting grpc server + grpcPort := *masterOptions.port + 10000 + grpcL, err := util.NewListener(*masterOptions.ipBind+":"+strconv.Itoa(grpcPort), 0) + if err != nil { + glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) + master_pb.RegisterSeaweedServer(grpcS, ms) + reflection.Register(grpcS) + glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort) + go grpcS.Serve(grpcL) + + go ms.MasterClient.KeepConnectedToMaster() + + // start http server + httpS := &http.Server{Handler: r} + go httpS.Serve(masterListener) + + select {} +} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index dce2197d6..cdf340067 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -246,6 +246,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir) server := fs.New(c, nil) seaweedFileSystem.Server = server + seaweedFileSystem.StartBackgroundTasks() err = server.Serve(seaweedFileSystem) // check if the mount process has an error to report diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 2d6729bd3..886c0ac5e 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -1,6 +1,8 @@ package command import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/command/scaffold" "io/ioutil" "path/filepath" ) @@ -35,17 +37,17 @@ func runScaffold(cmd *Command, args []string) bool { content := "" switch *config { case "filer": - content = FILER_TOML_EXAMPLE + content = scaffold.Filer case "notification": - content = NOTIFICATION_TOML_EXAMPLE + content = scaffold.Notification case "replication": - content = REPLICATION_TOML_EXAMPLE + content = scaffold.Replication case "security": - content = SECURITY_TOML_EXAMPLE + content = scaffold.Security case "master": - content = MASTER_TOML_EXAMPLE + content = scaffold.Master case "shell": - content = SHELL_TOML_EXAMPLE + content = scaffold.Shell } if content == "" { println("need a valid -config option") @@ -55,519 +57,7 @@ func runScaffold(cmd *Command, args []string) bool { if *outputPath != "" { ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644) } else { - println(content) + fmt.Println(content) } return true } - -const ( - FILER_TOML_EXAMPLE = ` -# A sample TOML config file for SeaweedFS filer store -# Used with "weed filer" or "weed server -filer" -# Put this file to one of the location, with descending priority -# ./filer.toml -# $HOME/.seaweedfs/filer.toml -# /etc/seaweedfs/filer.toml - -#################################################### -# Customizable filer server options -#################################################### -[filer.options] -# with http DELETE, by default the filer would check whether a folder is empty. -# recursive_delete will delete all sub folders and files, similar to "rm -Rf" -recursive_delete = false -# directories under this folder will be automatically creating a separate bucket -buckets_folder = "/buckets" - -#################################################### -# The following are filer store options -#################################################### - -[leveldb2] -# local on disk, mostly for simple single-machine setup, fairly scalable -# faster than previous leveldb, recommended. -enabled = true -dir = "./filerldb2" # directory to store level db files - -[leveldb3] -# similar to leveldb2. -# each bucket has its own meta store. -enabled = false -dir = "./filerldb3" # directory to store level db files - -[rocksdb] -# local on disk, similar to leveldb -# since it is using a C wrapper, you need to install rocksdb and build it by yourself -enabled = false -dir = "./filerrdb" # directory to store rocksdb files - -[sqlite] -# local on disk, similar to leveldb -enabled = false -dbFile = "./filer.db" # sqlite db file - -[mysql] # or memsql, tidb -# CREATE TABLE IF NOT EXISTS filemeta ( -# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', -# name VARCHAR(1000) BINARY COMMENT 'directory or file name', -# directory TEXT COMMENT 'full path to parent directory', -# meta LONGBLOB, -# PRIMARY KEY (dirhash, name) -# ) DEFAULT CHARSET=utf8; - -enabled = false -hostname = "localhost" -port = 3306 -username = "root" -password = "" -database = "" # create or use an existing database -connection_max_idle = 2 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -interpolateParams = false -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" - -[mysql2] # or memsql, tidb -enabled = false -createTable = """ - CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` ( - dirhash BIGINT, - name VARCHAR(1000) BINARY, - directory TEXT, - meta LONGBLOB, - PRIMARY KEY (dirhash, name) - ) DEFAULT CHARSET=utf8; -""" -hostname = "localhost" -port = 3306 -username = "root" -password = "" -database = "" # create or use an existing database -connection_max_idle = 2 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -interpolateParams = false -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" - -[postgres] # or cockroachdb, YugabyteDB -# CREATE TABLE IF NOT EXISTS filemeta ( -# dirhash BIGINT, -# name VARCHAR(65535), -# directory VARCHAR(65535), -# meta bytea, -# PRIMARY KEY (dirhash, name) -# ); -enabled = false -hostname = "localhost" -port = 5432 -username = "postgres" -password = "" -database = "postgres" # create or use an existing database -schema = "" -sslmode = "disable" -connection_max_idle = 100 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" - -[postgres2] -enabled = false -createTable = """ - CREATE TABLE IF NOT EXISTS "%s" ( - dirhash BIGINT, - name VARCHAR(65535), - directory VARCHAR(65535), - meta bytea, - PRIMARY KEY (dirhash, name) - ); -""" -hostname = "localhost" -port = 5432 -username = "postgres" -password = "" -database = "postgres" # create or use an existing database -schema = "" -sslmode = "disable" -connection_max_idle = 100 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" - -[cassandra] -# CREATE TABLE filemeta ( -# directory varchar, -# name varchar, -# meta blob, -# PRIMARY KEY (directory, name) -# ) WITH CLUSTERING ORDER BY (name ASC); -enabled = false -keyspace="seaweedfs" -hosts=[ - "localhost:9042", -] -username="" -password="" -# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. -superLargeDirectories = [] - -[hbase] -enabled = false -zkquorum = "" -table = "seaweedfs" - -[redis2] -enabled = false -address = "localhost:6379" -password = "" -database = 0 -# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. -superLargeDirectories = [] - -[redis_cluster2] -enabled = false -addresses = [ - "localhost:30001", - "localhost:30002", - "localhost:30003", - "localhost:30004", - "localhost:30005", - "localhost:30006", -] -password = "" -# allows reads from slave servers or the master, but all writes still go to the master -readOnly = false -# automatically use the closest Redis server for reads -routeByLatency = false -# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. -superLargeDirectories = [] - -[etcd] -enabled = false -servers = "localhost:2379" -timeout = "3s" - -[mongodb] -enabled = false -uri = "mongodb://localhost:27017" -option_pool_size = 0 -database = "seaweedfs" - -[elastic7] -enabled = false -servers = [ - "http://localhost1:9200", - "http://localhost2:9200", - "http://localhost3:9200", -] -username = "" -password = "" -sniff_enabled = false -healthcheck_enabled = false -# increase the value is recommend, be sure the value in Elastic is greater or equal here -index.max_result_window = 10000 - - - -########################## -########################## -# To add path-specific filer store: -# -# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp -# 2. Add a location configuraiton. E.g., location = "/tmp/" -# 3. Copy and customize all other configurations. -# Make sure they are not the same if using the same store type! -# 4. Set enabled to true -# -# The following is just using redis as an example -########################## -[redis2.tmp] -enabled = false -location = "/tmp/" -address = "localhost:6379" -password = "" -database = 1 - -` - - NOTIFICATION_TOML_EXAMPLE = ` -# A sample TOML config file for SeaweedFS filer store -# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate" -# Put this file to one of the location, with descending priority -# ./notification.toml -# $HOME/.seaweedfs/notification.toml -# /etc/seaweedfs/notification.toml - -#################################################### -# notification -# send and receive filer updates for each file to an external message queue -#################################################### -[notification.log] -# this is only for debugging perpose and does not work with "weed filer.replicate" -enabled = false - - -[notification.kafka] -enabled = false -hosts = [ - "localhost:9092" -] -topic = "seaweedfs_filer" -offsetFile = "./last.offset" -offsetSaveIntervalSeconds = 10 - - -[notification.aws_sqs] -# experimental, let me know if it works -enabled = false -aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -region = "us-east-2" -sqs_queue_name = "my_filer_queue" # an existing queue name - - -[notification.google_pub_sub] -# read credentials doc at https://cloud.google.com/docs/authentication/getting-started -enabled = false -google_application_credentials = "/path/to/x.json" # path to json credential file -project_id = "" # an existing project id -topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists - -[notification.gocdk_pub_sub] -# The Go Cloud Development Kit (https://gocloud.dev). -# PubSub API (https://godoc.org/gocloud.dev/pubsub). -# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. -enabled = false -# This URL will Dial the RabbitMQ server at the URL in the environment -# variable RABBIT_SERVER_URL and open the exchange "myexchange". -# The exchange must have already been created by some other means, like -# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then -# create binding myexchange => myqueue -topic_url = "rabbit://myexchange" -sub_url = "rabbit://myqueue" -` - - REPLICATION_TOML_EXAMPLE = ` -# A sample TOML config file for replicating SeaweedFS filer -# Used with "weed filer.backup" -# Using with "weed filer.replicate" is deprecated. -# Put this file to one of the location, with descending priority -# ./replication.toml -# $HOME/.seaweedfs/replication.toml -# /etc/seaweedfs/replication.toml - -[source.filer] # deprecated. Only useful with "weed filer.replicate" -enabled = true -grpcAddress = "localhost:18888" -# all files under this directory tree are replicated. -# this is not a directory on your hard drive, but on your filer. -# i.e., all files with this "prefix" are sent to notification message queue. -directory = "/buckets" - -[sink.local] -enabled = false -directory = "/data" -# all replicated files are under modified time as yyyy-mm-dd directories -# so each date directory contains all new and updated files. -is_incremental = false - -[sink.filer] -enabled = false -grpcAddress = "localhost:18888" -# all replicated files are under this directory tree -# this is not a directory on your hard drive, but on your filer. -# i.e., all received files will be "prefixed" to this directory. -directory = "/backup" -replication = "" -collection = "" -ttlSec = 0 -is_incremental = false - -[sink.s3] -# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html -# default loads credentials from the shared credentials file (~/.aws/credentials). -enabled = false -aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -region = "us-east-2" -bucket = "your_bucket_name" # an existing bucket -directory = "/" # destination directory -endpoint = "" -is_incremental = false - -[sink.google_cloud_storage] -# read credentials doc at https://cloud.google.com/docs/authentication/getting-started -enabled = false -google_application_credentials = "/path/to/x.json" # path to json credential file -bucket = "your_bucket_seaweedfs" # an existing bucket -directory = "/" # destination directory -is_incremental = false - -[sink.azure] -# experimental, let me know if it works -enabled = false -account_name = "" -account_key = "" -container = "mycontainer" # an existing container -directory = "/" # destination directory -is_incremental = false - -[sink.backblaze] -enabled = false -b2_account_id = "" -b2_master_application_key = "" -bucket = "mybucket" # an existing bucket -directory = "/" # destination directory -is_incremental = false - -` - - SECURITY_TOML_EXAMPLE = ` -# Put this file to one of the location, with descending priority -# ./security.toml -# $HOME/.seaweedfs/security.toml -# /etc/seaweedfs/security.toml -# this file is read by master, volume server, and filer - -# the jwt signing key is read by master and volume server. -# a jwt defaults to expire after 10 seconds. -[jwt.signing] -key = "" -expires_after_seconds = 10 # seconds - -# jwt for read is only supported with master+volume setup. Filer does not support this mode. -[jwt.signing.read] -key = "" -expires_after_seconds = 10 # seconds - -# all grpc tls authentications are mutual -# the values for the following ca, cert, and key are paths to the PERM files. -# the host name is not checked, so the PERM files can be shared. -[grpc] -ca = "" -# Set wildcard domain for enable TLS authentication by common names -allowed_wildcard_domain = "" # .mycompany.com - -[grpc.volume] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -[grpc.master] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -[grpc.filer] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -[grpc.msg_broker] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -# use this for any place needs a grpc client -# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" -[grpc.client] -cert = "" -key = "" - -# volume server https options -# Note: work in progress! -# this does not work with other clients, e.g., "weed filer|mount" etc, yet. -[https.client] -enabled = true -[https.volume] -cert = "" -key = "" - - -` - - MASTER_TOML_EXAMPLE = ` -# Put this file to one of the location, with descending priority -# ./master.toml -# $HOME/.seaweedfs/master.toml -# /etc/seaweedfs/master.toml -# this file is read by master - -[master.maintenance] -# periodically run these scripts are the same as running them from 'weed shell' -scripts = """ - lock - ec.encode -fullPercent=95 -quietFor=1h - ec.rebuild -force - ec.balance -force - volume.balance -force - volume.fix.replication - unlock -""" -sleep_minutes = 17 # sleep minutes between each script execution - -[master.filer] -default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands - - -[master.sequencer] -type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence -# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence -# example : http://127.0.0.1:2379,http://127.0.0.1:2389 -sequencer_etcd_urls = "http://127.0.0.1:2379" - - -# configurations for tiered cloud storage -# old volumes are transparently moved to cloud for cost efficiency -[storage.backend] - [storage.backend.s3.default] - enabled = false - aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). - aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). - region = "us-east-2" - bucket = "your_bucket_name" # an existing bucket - endpoint = "" - -# create this number of logical volumes if no more writable volumes -# count_x means how many copies of data. -# e.g.: -# 000 has only one copy, copy_1 -# 010 and 001 has two copies, copy_2 -# 011 has only 3 copies, copy_3 -[master.volume_growth] -copy_1 = 7 # create 1 x 7 = 7 actual volumes -copy_2 = 6 # create 2 x 6 = 12 actual volumes -copy_3 = 3 # create 3 x 3 = 9 actual volumes -copy_other = 1 # create n x 1 = n actual volumes - -# configuration flags for replication -[master.replication] -# any replication counts should be considered minimums. If you specify 010 and -# have 3 different racks, that's still considered writable. Writes will still -# try to replicate to all available volumes. You should only use this option -# if you are doing your own replication or periodic sync of volumes. -treat_replication_as_minimums = false - -` - SHELL_TOML_EXAMPLE = ` - -[cluster] -default = "c1" - -[cluster.c1] -master = "localhost:9333" # comma-separated master servers -filer = "localhost:8888" # filer host and port - -[cluster.c2] -master = "" -filer = "" - -` -) diff --git a/weed/command/scaffold/example.go b/weed/command/scaffold/example.go new file mode 100644 index 000000000..6be6804e5 --- /dev/null +++ b/weed/command/scaffold/example.go @@ -0,0 +1,21 @@ +package scaffold + +import _ "embed" + +//go:embed filer.toml +var Filer string + +//go:embed notification.toml +var Notification string + +//go:embed replication.toml +var Replication string + +//go:embed security.toml +var Security string + +//go:embed master.toml +var Master string + +//go:embed shell.toml +var Shell string diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml new file mode 100644 index 000000000..9e9258865 --- /dev/null +++ b/weed/command/scaffold/filer.toml @@ -0,0 +1,232 @@ +# A sample TOML config file for SeaweedFS filer store +# Used with "weed filer" or "weed server -filer" +# Put this file to one of the location, with descending priority +# ./filer.toml +# $HOME/.seaweedfs/filer.toml +# /etc/seaweedfs/filer.toml + +#################################################### +# Customizable filer server options +#################################################### +[filer.options] +# with http DELETE, by default the filer would check whether a folder is empty. +# recursive_delete will delete all sub folders and files, similar to "rm -Rf" +recursive_delete = false + +#################################################### +# The following are filer store options +#################################################### + +[leveldb2] +# local on disk, mostly for simple single-machine setup, fairly scalable +# faster than previous leveldb, recommended. +enabled = true +dir = "./filerldb2" # directory to store level db files + +[leveldb3] +# similar to leveldb2. +# each bucket has its own meta store. +enabled = false +dir = "./filerldb3" # directory to store level db files + +[rocksdb] +# local on disk, similar to leveldb +# since it is using a C wrapper, you need to install rocksdb and build it by yourself +enabled = false +dir = "./filerrdb" # directory to store rocksdb files + +[sqlite] +# local on disk, similar to leveldb +enabled = false +dbFile = "./filer.db" # sqlite db file + +[mysql] # or memsql, tidb +# CREATE TABLE IF NOT EXISTS filemeta ( +# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', +# name VARCHAR(1000) BINARY COMMENT 'directory or file name', +# directory TEXT COMMENT 'full path to parent directory', +# meta LONGBLOB, +# PRIMARY KEY (dirhash, name) +# ) DEFAULT CHARSET=utf8; + +enabled = false +hostname = "localhost" +port = 3306 +username = "root" +password = "" +database = "" # create or use an existing database +connection_max_idle = 2 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" + +[mysql2] # or memsql, tidb +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` ( + dirhash BIGINT, + name VARCHAR(1000) BINARY, + directory TEXT, + meta LONGBLOB, + PRIMARY KEY (dirhash, name) + ) DEFAULT CHARSET=utf8; +""" +hostname = "localhost" +port = 3306 +username = "root" +password = "" +database = "" # create or use an existing database +connection_max_idle = 2 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" + +[postgres] # or cockroachdb, YugabyteDB +# CREATE TABLE IF NOT EXISTS filemeta ( +# dirhash BIGINT, +# name VARCHAR(65535), +# directory VARCHAR(65535), +# meta bytea, +# PRIMARY KEY (dirhash, name) +# ); +enabled = false +hostname = "localhost" +port = 5432 +username = "postgres" +password = "" +database = "postgres" # create or use an existing database +schema = "" +sslmode = "disable" +connection_max_idle = 100 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" + +[postgres2] +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS "%s" ( + dirhash BIGINT, + name VARCHAR(65535), + directory VARCHAR(65535), + meta bytea, + PRIMARY KEY (dirhash, name) + ); +""" +hostname = "localhost" +port = 5432 +username = "postgres" +password = "" +database = "postgres" # create or use an existing database +schema = "" +sslmode = "disable" +connection_max_idle = 100 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" + +[cassandra] +# CREATE TABLE filemeta ( +# directory varchar, +# name varchar, +# meta blob, +# PRIMARY KEY (directory, name) +# ) WITH CLUSTERING ORDER BY (name ASC); +enabled = false +keyspace = "seaweedfs" +hosts = [ + "localhost:9042", +] +username = "" +password = "" +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] +# Name of the datacenter local to this filer, used as host selection fallback. +localDC = "" + +[hbase] +enabled = false +zkquorum = "" +table = "seaweedfs" + +[redis2] +enabled = false +address = "localhost:6379" +password = "" +database = 0 +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[redis_cluster2] +enabled = false +addresses = [ + "localhost:30001", + "localhost:30002", + "localhost:30003", + "localhost:30004", + "localhost:30005", + "localhost:30006", +] +password = "" +# allows reads from slave servers or the master, but all writes still go to the master +readOnly = false +# automatically use the closest Redis server for reads +routeByLatency = false +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[etcd] +enabled = false +servers = "localhost:2379" +timeout = "3s" + +[mongodb] +enabled = false +uri = "mongodb://localhost:27017" +option_pool_size = 0 +database = "seaweedfs" + +[elastic7] +enabled = false +servers = [ + "http://localhost1:9200", + "http://localhost2:9200", + "http://localhost3:9200", +] +username = "" +password = "" +sniff_enabled = false +healthcheck_enabled = false +# increase the value is recommend, be sure the value in Elastic is greater or equal here +index.max_result_window = 10000 + + + +########################## +########################## +# To add path-specific filer store: +# +# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp +# 2. Add a location configuraiton. E.g., location = "/tmp/" +# 3. Copy and customize all other configurations. +# Make sure they are not the same if using the same store type! +# 4. Set enabled to true +# +# The following is just using redis as an example +########################## +[redis2.tmp] +enabled = false +location = "/tmp/" +address = "localhost:6379" +password = "" +database = 1 diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml new file mode 100644 index 000000000..020f48e36 --- /dev/null +++ b/weed/command/scaffold/master.toml @@ -0,0 +1,63 @@ +# Put this file to one of the location, with descending priority +# ./master.toml +# $HOME/.seaweedfs/master.toml +# /etc/seaweedfs/master.toml +# this file is read by master + +[master.maintenance] +# periodically run these scripts are the same as running them from 'weed shell' +scripts = """ + lock + ec.encode -fullPercent=95 -quietFor=1h + ec.rebuild -force + ec.balance -force + volume.deleteEmpty -quietFor=24h -force + volume.balance -force + volume.fix.replication + unlock +""" +sleep_minutes = 17 # sleep minutes between each script execution + +[master.filer] +default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands + + +[master.sequencer] +type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence +# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence +# example : http://127.0.0.1:2379,http://127.0.0.1:2389 +sequencer_etcd_urls = "http://127.0.0.1:2379" +# when sequencer.type = snowflake, the snowflake id must be different from other masters +sequencer_snowflake_id = 0 # any number between 1~1023 + + +# configurations for tiered cloud storage +# old volumes are transparently moved to cloud for cost efficiency +[storage.backend] +[storage.backend.s3.default] +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +bucket = "your_bucket_name" # an existing bucket +endpoint = "" + +# create this number of logical volumes if no more writable volumes +# count_x means how many copies of data. +# e.g.: +# 000 has only one copy, copy_1 +# 010 and 001 has two copies, copy_2 +# 011 has only 3 copies, copy_3 +[master.volume_growth] +copy_1 = 7 # create 1 x 7 = 7 actual volumes +copy_2 = 6 # create 2 x 6 = 12 actual volumes +copy_3 = 3 # create 3 x 3 = 9 actual volumes +copy_other = 1 # create n x 1 = n actual volumes + +# configuration flags for replication +[master.replication] +# any replication counts should be considered minimums. If you specify 010 and +# have 3 different racks, that's still considered writable. Writes will still +# try to replicate to all available volumes. You should only use this option +# if you are doing your own replication or periodic sync of volumes. +treat_replication_as_minimums = false diff --git a/weed/command/scaffold/notification.toml b/weed/command/scaffold/notification.toml new file mode 100644 index 000000000..f35101edd --- /dev/null +++ b/weed/command/scaffold/notification.toml @@ -0,0 +1,54 @@ +# A sample TOML config file for SeaweedFS filer store +# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate" +# Put this file to one of the location, with descending priority +# ./notification.toml +# $HOME/.seaweedfs/notification.toml +# /etc/seaweedfs/notification.toml + +#################################################### +# notification +# send and receive filer updates for each file to an external message queue +#################################################### +[notification.log] +# this is only for debugging perpose and does not work with "weed filer.replicate" +enabled = false + + +[notification.kafka] +enabled = false +hosts = [ + "localhost:9092" +] +topic = "seaweedfs_filer" +offsetFile = "./last.offset" +offsetSaveIntervalSeconds = 10 + + +[notification.aws_sqs] +# experimental, let me know if it works +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +sqs_queue_name = "my_filer_queue" # an existing queue name + + +[notification.google_pub_sub] +# read credentials doc at https://cloud.google.com/docs/authentication/getting-started +enabled = false +google_application_credentials = "/path/to/x.json" # path to json credential file +project_id = "" # an existing project id +topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists + +[notification.gocdk_pub_sub] +# The Go Cloud Development Kit (https://gocloud.dev). +# PubSub API (https://godoc.org/gocloud.dev/pubsub). +# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. +enabled = false +# This URL will Dial the RabbitMQ server at the URL in the environment +# variable RABBIT_SERVER_URL and open the exchange "myexchange". +# The exchange must have already been created by some other means, like +# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then +# create binding myexchange => myqueue +topic_url = "rabbit://myexchange" +sub_url = "rabbit://myqueue" diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml new file mode 100644 index 000000000..c463c8077 --- /dev/null +++ b/weed/command/scaffold/replication.toml @@ -0,0 +1,71 @@ +# A sample TOML config file for replicating SeaweedFS filer +# Used with "weed filer.backup" +# Using with "weed filer.replicate" is deprecated. +# Put this file to one of the location, with descending priority +# ./replication.toml +# $HOME/.seaweedfs/replication.toml +# /etc/seaweedfs/replication.toml + +[source.filer] # deprecated. Only useful with "weed filer.replicate" +enabled = true +grpcAddress = "localhost:18888" +# all files under this directory tree are replicated. +# this is not a directory on your hard drive, but on your filer. +# i.e., all files with this "prefix" are sent to notification message queue. +directory = "/buckets" + +[sink.local] +enabled = false +directory = "/data" +# all replicated files are under modified time as yyyy-mm-dd directories +# so each date directory contains all new and updated files. +is_incremental = false + +[sink.filer] +enabled = false +grpcAddress = "localhost:18888" +# all replicated files are under this directory tree +# this is not a directory on your hard drive, but on your filer. +# i.e., all received files will be "prefixed" to this directory. +directory = "/backup" +replication = "" +collection = "" +ttlSec = 0 +is_incremental = false + +[sink.s3] +# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html +# default loads credentials from the shared credentials file (~/.aws/credentials). +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +bucket = "your_bucket_name" # an existing bucket +directory = "/" # destination directory +endpoint = "" +is_incremental = false + +[sink.google_cloud_storage] +# read credentials doc at https://cloud.google.com/docs/authentication/getting-started +enabled = false +google_application_credentials = "/path/to/x.json" # path to json credential file +bucket = "your_bucket_seaweedfs" # an existing bucket +directory = "/" # destination directory +is_incremental = false + +[sink.azure] +# experimental, let me know if it works +enabled = false +account_name = "" +account_key = "" +container = "mycontainer" # an existing container +directory = "/" # destination directory +is_incremental = false + +[sink.backblaze] +enabled = false +b2_account_id = "" +b2_master_application_key = "" +bucket = "mybucket" # an existing bucket +directory = "/" # destination directory +is_incremental = false diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml new file mode 100644 index 000000000..0c69b2f24 --- /dev/null +++ b/weed/command/scaffold/security.toml @@ -0,0 +1,60 @@ +# Put this file to one of the location, with descending priority +# ./security.toml +# $HOME/.seaweedfs/security.toml +# /etc/seaweedfs/security.toml +# this file is read by master, volume server, and filer + +# the jwt signing key is read by master and volume server. +# a jwt defaults to expire after 10 seconds. +[jwt.signing] +key = "" +expires_after_seconds = 10 # seconds + +# jwt for read is only supported with master+volume setup. Filer does not support this mode. +[jwt.signing.read] +key = "" +expires_after_seconds = 10 # seconds + +# all grpc tls authentications are mutual +# the values for the following ca, cert, and key are paths to the PERM files. +# the host name is not checked, so the PERM files can be shared. +[grpc] +ca = "" +# Set wildcard domain for enable TLS authentication by common names +allowed_wildcard_domain = "" # .mycompany.com + +[grpc.volume] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +[grpc.master] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +[grpc.filer] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +[grpc.msg_broker] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +# use this for any place needs a grpc client +# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" +[grpc.client] +cert = "" +key = "" + +# volume server https options +# Note: work in progress! +# this does not work with other clients, e.g., "weed filer|mount" etc, yet. +[https.client] +enabled = true +[https.volume] +cert = "" +key = "" + diff --git a/weed/command/scaffold/shell.toml b/weed/command/scaffold/shell.toml new file mode 100644 index 000000000..288ae2efe --- /dev/null +++ b/weed/command/scaffold/shell.toml @@ -0,0 +1,10 @@ +[cluster] +default = "c1" + +[cluster.c1] +master = "localhost:9333" # comma-separated master servers +filer = "localhost:8888" # filer host and port + +[cluster.c2] +master = "" +filer = "" diff --git a/weed/command/server.go b/weed/command/server.go index d2bd6466e..c784d90b9 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -3,6 +3,7 @@ package command import ( "fmt" "github.com/chrislusf/seaweedfs/weed/util/grace" + "net/http" "os" "strings" "time" @@ -16,6 +17,8 @@ import ( type ServerOptions struct { cpuprofile *string memprofile *string + debug *bool + debugPort *int v VolumeServerOptions } @@ -78,13 +81,15 @@ var ( func init() { serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file") + serverOptions.debug = cmdServer.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:6060/debug/pprof/goroutine?debug=2") + serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging") masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") masterOptions.volumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.") - masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "000", "Default replication type if not specified.") + masterOptions.defaultReplication = cmdServer.Flag.String("master.defaultReplication", "", "Default replication type if not specified.") masterOptions.garbageThreshold = cmdServer.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces") masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address") masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") @@ -107,10 +112,11 @@ func init() { serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") - serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") + serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") @@ -139,6 +145,10 @@ func init() { func runServer(cmd *Command, args []string) bool { + if *serverOptions.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil) + } + util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) diff --git a/weed/command/shell.go b/weed/command/shell.go index c9976e809..4a9f4b027 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -55,6 +55,7 @@ func runShell(command *Command, args []string) bool { var err error shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) + shellOptions.FilerAddress = *shellInitialFiler if err != nil { fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err) return false diff --git a/weed/command/upload.go b/weed/command/upload.go index 0f9361b40..9ae1befab 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -71,20 +71,20 @@ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master) + defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master) if err != nil { fmt.Printf("upload: %v", err) return false } if *upload.replication == "" { - *upload.replication = defaultCollection + *upload.replication = defaultReplication } if len(args) == 0 { if *upload.dir == "" { return false } - filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error { if err == nil { if !info.IsDir() { if *upload.include != "" { @@ -108,12 +108,21 @@ func runUpload(cmd *Command, args []string) bool { } return err }) + if err != nil { + fmt.Println(err.Error()) + return false + } } else { parts, e := operation.NewFileParts(args) if e != nil { fmt.Println(e.Error()) + return false + } + results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + if err != nil { + fmt.Println(err.Error()) + return false } - results, _ := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/weed/command/volume.go b/weed/command/volume.go index 139a3791e..235eff11b 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -35,31 +35,32 @@ var ( ) type VolumeServerOptions struct { - port *int - publicPort *int - folders []string - folderMaxLimits []int - idxFolder *string - ip *string - publicUrl *string - bindIp *string - masters *string - idleConnectionTimeout *int - dataCenter *string - rack *string - whiteList []string - indexType *string - diskType *string - fixJpgOrientation *bool - readRedirect *bool - cpuProfile *string - memProfile *string - compactionMBPerSecond *int - fileSizeLimitMB *int - concurrentUploadLimitMB *int - pprof *bool - preStopSeconds *int - metricsHttpPort *int + port *int + publicPort *int + folders []string + folderMaxLimits []int + idxFolder *string + ip *string + publicUrl *string + bindIp *string + masters *string + idleConnectionTimeout *int + dataCenter *string + rack *string + whiteList []string + indexType *string + diskType *string + fixJpgOrientation *bool + readMode *string + cpuProfile *string + memProfile *string + compactionMBPerSecond *int + fileSizeLimitMB *int + concurrentUploadLimitMB *int + concurrentDownloadLimitMB *int + pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int enableTcp *bool } @@ -80,12 +81,13 @@ func init() { v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") - v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") + v.readMode = cmdVolume.Flag.String("readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") - v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size") + v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") @@ -228,10 +230,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, - *v.fixJpgOrientation, *v.readRedirect, + *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, + int64(*v.concurrentDownloadLimitMB)*1024*1024, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -259,6 +262,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // Stop heartbeats if !volumeServer.StopHeartbeat() { + volumeServer.SetStopping() glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) } |
