diff options
| author | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
|---|---|---|
| committer | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
| commit | 1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch) | |
| tree | 12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/command | |
| parent | e6e57db530217ff57b3622b4672b03ebb6313e96 (diff) | |
| parent | f9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff) | |
| download | seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip | |
merge master, resolve conflicts
Diffstat (limited to 'weed/command')
28 files changed, 1310 insertions, 861 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go new file mode 100644 index 000000000..9a545a183 --- /dev/null +++ b/weed/command/autocomplete.go @@ -0,0 +1,109 @@ +package command + +import ( + "fmt" + flag "github.com/chrislusf/seaweedfs/weed/util/fla9" + "github.com/posener/complete" + completeinstall "github.com/posener/complete/cmd/install" + "runtime" +) + +func AutocompleteMain(commands []*Command) bool { + subCommands := make(map[string]complete.Command) + helpSubCommands := make(map[string]complete.Command) + for _, cmd := range commands { + flags := make(map[string]complete.Predictor) + cmd.Flag.VisitAll(func(flag *flag.Flag) { + flags["-"+flag.Name] = complete.PredictAnything + }) + + subCommands[cmd.Name()] = complete.Command{ + Flags: flags, + } + helpSubCommands[cmd.Name()] = complete.Command{} + } + subCommands["help"] = complete.Command{Sub: helpSubCommands} + + globalFlags := make(map[string]complete.Predictor) + flag.VisitAll(func(flag *flag.Flag) { + globalFlags["-"+flag.Name] = complete.PredictAnything + }) + + weedCmd := complete.Command{ + Sub: subCommands, + Flags: globalFlags, + GlobalFlags: complete.Flags{"-h": complete.PredictNothing}, + } + cmp := complete.New("weed", weedCmd) + + return cmp.Complete() +} + +func installAutoCompletion() bool { + if runtime.GOOS == "windows" { + fmt.Println("windows is not supported") + return false + } + + err := completeinstall.Install("weed") + if err != nil { + fmt.Printf("install failed! %s\n", err) + return false + } + fmt.Printf("autocompletion is enabled. Please restart your shell.\n") + return true +} + +func uninstallAutoCompletion() bool { + if runtime.GOOS == "windows" { + fmt.Println("windows is not supported") + return false + } + + err := completeinstall.Uninstall("weed") + if err != nil { + fmt.Printf("uninstall failed! %s\n", err) + return false + } + fmt.Printf("autocompletion is disable. Please restart your shell.\n") + return true +} + +var cmdAutocomplete = &Command{ + Run: runAutocomplete, + UsageLine: "autocomplete", + Short: "install autocomplete", + Long: `weed autocomplete is installed in the shell. + + Supported shells are bash, zsh, and fish. + Windows is not supported. + +`, +} + +func runAutocomplete(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + return installAutoCompletion() +} + +var cmdUnautocomplete = &Command{ + Run: runUnautocomplete, + UsageLine: "autocomplete.uninstall", + Short: "uninstall autocomplete", + Long: `weed autocomplete is uninstalled in the shell. + + Windows is not supported. + +`, +} + +func runUnautocomplete(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + return uninstallAutoCompletion() +} diff --git a/weed/command/command.go b/weed/command/command.go index 18e53ad8c..02de2bd35 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -8,17 +8,20 @@ import ( ) var Commands = []*Command{ - cmdBenchmark, + cmdAutocomplete, + cmdUnautocomplete, cmdBackup, + cmdBenchmark, cmdCompact, - cmdCopy, cmdDownload, cmdExport, cmdFiler, cmdFilerBackup, cmdFilerCat, + cmdFilerCopy, cmdFilerMetaBackup, cmdFilerMetaTail, + cmdFilerRemoteSynchronize, cmdFilerReplicate, cmdFilerSynchronize, cmdFix, diff --git a/weed/command/filer.go b/weed/command/filer.go index a723b4d8a..ddee0852c 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -50,6 +50,8 @@ type FilerOptions struct { saveToFilerLimit *int defaultLevelDbDirectory *string concurrentUploadLimitMB *int + debug *bool + debugPort *int } func init() { @@ -73,6 +75,8 @@ func init() { f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") + f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -122,6 +126,9 @@ var cmdFiler = &Command{ } func runFiler(cmd *Command, args []string) bool { + if *f.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil) + } util.LoadConfiguration("security", false) diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 888b46fe7..0c450181b 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -1,16 +1,13 @@ package command import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" - "io" "time" ) @@ -52,11 +49,11 @@ var cmdFilerBackup = &Command{ func runFilerBackup(cmd *Command, args []string) bool { - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - util.LoadConfiguration("security", false) util.LoadConfiguration("replication", true) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + for { err := doFilerBackup(grpcDialOption, &filerBackupOptions) if err != nil { @@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "backup_" + dataSink.GetName(), - PathPrefix: sourcePath, - SinceNs: startFrom.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return fmt.Errorf("processEventFn: %v", err) - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil { - return fmt.Errorf("setOffset: %v", err) - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), + sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false) + } diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index a5d29c451..722f64679 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -52,21 +52,21 @@ type CopyOptions struct { } func init() { - cmdCopy.Run = runCopy // break init cycle - cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information") - copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") - copy.replication = cmdCopy.Flag.String("replication", "", "replication type") - copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") - copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.diskType = cmdCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit") - copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") - copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") - copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file") - copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying") + cmdFilerCopy.Run = runCopy // break init cycle + cmdFilerCopy.IsDebug = cmdFilerCopy.Flag.Bool("debug", false, "verbose debug information") + copy.include = cmdFilerCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") + copy.replication = cmdFilerCopy.Flag.String("replication", "", "replication type") + copy.collection = cmdFilerCopy.Flag.String("collection", "", "optional collection name") + copy.ttl = cmdFilerCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") + copy.diskType = cmdFilerCopy.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") + copy.maxMB = cmdFilerCopy.Flag.Int("maxMB", 4, "split files larger than the limit") + copy.concurrenctFiles = cmdFilerCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctChunks = cmdFilerCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") + copy.checkSize = cmdFilerCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file") + copy.verbose = cmdFilerCopy.Flag.Bool("verbose", false, "print out details during copying") } -var cmdCopy = &Command{ +var cmdFilerCopy = &Command{ UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/", Short: "copy one or a list of files to a filer folder", Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder @@ -154,7 +154,7 @@ func runCopy(cmd *Command, args []string) bool { } copy.ttlSec = int32(ttl.Minutes()) * 60 - if *cmdCopy.IsDebug { + if *cmdFilerCopy.IsDebug { grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } @@ -213,11 +213,15 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi mode := fi.Mode() uid, gid := util.GetFileUidGid(fi) + fileSize := fi.Size() + if mode.IsDir() { + fileSize = 0 + } fileCopyTaskChan <- FileCopyTask{ sourceLocation: fileOrDir, destinationUrlPath: destPath, - fileSize: fi.Size(), + fileSize: fileSize, fileMode: fi.Mode(), uid: uid, gid: gid, @@ -377,6 +381,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if assignResult.Error != "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) } + if assignResult.Url == "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult) + } return nil }) }) diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index ba0b44659..6fe323fba 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" "google.golang.org/grpc" - "io" "reflect" "time" @@ -53,6 +52,7 @@ The backup writes to another filer store specified in a backup_filer.toml. func runFilerMetaBackup(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") // load backup_filer.toml @@ -189,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return nil } - tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "meta_backup", - PathPrefix: *metaBackup.filerDirectory, - SinceNs: startTime.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - if err = eachEntryFunc(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil { - return err2 - } - } + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3)) + return metaBackup.setOffset(lastTime) + }) - } + return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", + *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) - }) - return tailErr } func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 8451ffd78..28c0db99b 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -3,16 +3,15 @@ package command import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/golang/protobuf/jsonpb" jsoniter "github.com/json-iterator/go" "github.com/olivere/elastic/v7" - "io" "os" "path/filepath" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -45,6 +44,7 @@ var ( func runFilerMetaTail(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var filterFunc func(dir, fname string) bool @@ -103,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "tail", - PathPrefix: *tailTarget, - SinceNs: time.Now().Add(-*tailStart).UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", + *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0, + func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { - continue + return nil } - if err = eachEntryFunc(resp); err != nil { + if err := eachEntryFunc(resp); err != nil { return err } - } + return nil + }, false) - }) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go new file mode 100644 index 000000000..b7e90b3e7 --- /dev/null +++ b/weed/command/filer_remote_sync.go @@ -0,0 +1,257 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "time" +) + +type RemoteSyncOptions struct { + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + debug *bool + timeAgo *time.Duration + dir *string +} + +const ( + RemoteSyncKeyPrefix = "remote.sync." +) + +var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) + +func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) +} +func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +var ( + remoteSyncOptions RemoteSyncOptions +) + +func init() { + cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle + remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") + remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") + remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") + remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") + remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerRemoteSynchronize = &Command{ + UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud", + Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage", + Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage + + filer.remote.sync listens on filer update events. + If any mounted remote file is updated, it will fetch the updated content, + and write to the remote storage. +`, +} + +func runFilerRemoteSynchronize(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + remoteSyncOptions.grpcDialOption = grpcDialOption + + // read filer remote storage mount mappings + mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress) + if readErr != nil { + fmt.Printf("read mount mapping: %v", readErr) + return false + } + + filerSource := &source.FilerSource{} + filerSource.DoInitialize( + *remoteSyncOptions.filerAddress, + pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress), + "/", // does not matter + *remoteSyncOptions.readChunkFromFiler, + ) + + var found bool + for dir, remoteStorageMountLocation := range mappings.Mappings { + if *remoteSyncOptions.dir == dir { + found = true + storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name) + if readErr != nil { + fmt.Printf("read remote storage configuration for %s: %v", dir, readErr) + continue + } + fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir) + if err := util.Retry("filer.remote.sync "+dir, func() error { + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) + }); err != nil { + fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err) + } + break + } + } + if !found { + fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir) + return false + } + + return true +} + +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error { + + dirHash := util.HashStringToLong(mountedDir) + + // 1. specified by timeAgo + // 2. last offset timestamp for this directory + // 3. directory creation time + var lastOffsetTs time.Time + if *option.timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) + if err != nil { + return fmt.Errorf("lookup %s: %v", mountedDir, err) + } + + lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } + } else { + lastOffsetTs = time.Now().Add(-*option.timeAgo) + } + + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if len(message.NewEntry.Chunks) == 0 { + return nil + } + fmt.Printf("create: %+v\n", resp) + if !shouldSendToRemote(message.NewEntry) { + fmt.Printf("skipping creating: %+v\n", resp) + return nil + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if message.OldEntry != nil && message.NewEntry == nil { + fmt.Printf("delete: %+v\n", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if !shouldSendToRemote(message.NewEntry) { + fmt.Printf("skipping updating: %+v\n", resp) + return nil + } + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) { + fmt.Printf("update meta: %+v\n", resp) + return client.UpdateFileMetadata(dest, message.NewEntry) + } + } + fmt.Printf("update: %+v\n", resp) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + + return nil + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) + }) + + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, + "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation { + source := string(sourcePath[len(mountDir):]) + dest := util.FullPath(remoteMountLocation.Path).Child(source) + return &filer_pb.RemoteStorageLocation{ + Name: remoteMountLocation.Name, + Bucket: remoteMountLocation.Bucket, + Path: string(dest), + } +} + +func isSameChunks(a, b []*filer_pb.FileChunk) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + x, y := a[i], b[i] + if !proto.Equal(x, y) { + return false + } + } + return true +} + +func shouldSendToRemote(entry *filer_pb.Entry) bool { + if entry.RemoteEntry == nil { + return true + } + if entry.RemoteEntry.LocalMtime < entry.Attributes.Mtime { + return true + } + return false +} + +func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + entry.RemoteEntry = remoteEntry + return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) +} diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 885c95540..bf0a3e140 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -7,12 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/replication" "github.com/chrislusf/seaweedfs/weed/replication/sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" ) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 0f34e5701..5440811dd 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -15,7 +15,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" - "io" "strings" "time" ) @@ -71,8 +70,8 @@ func init() { var cmdFilerSynchronize = &Command{ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>", - Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters", - Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers + Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters", + Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. Different from filer.replicate: @@ -89,6 +88,7 @@ var cmdFilerSynchronize = &Command{ func runFilerSynchronize(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) @@ -165,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return persistEventFn(resp) } - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "syncTo_" + targetFiler, - PathPrefix: sourcePath, - SinceNs: sourceFilerOffsetTsNs, - Signature: targetFilerSignature, - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { - return err - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, + sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + } const ( @@ -359,16 +323,19 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl return processEventFn } -func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string { +func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) { if !dataSink.IsIncremental() { - return util.Join(targetPath, string(sourceKey)[len(sourcePath):]) - } - var mTime int64 - if message.NewEntry != nil { - mTime = message.NewEntry.Attributes.Mtime - } else if message.OldEntry != nil { - mTime = message.OldEntry.Attributes.Mtime + key = util.Join(targetPath, string(sourceKey)[len(sourcePath):]) + } else { + var mTime int64 + if message.NewEntry != nil { + mTime = message.NewEntry.Attributes.Mtime + } else if message.OldEntry != nil { + mTime = message.OldEntry.Attributes.Mtime + } + dateKey := time.Unix(mTime, 0).Format("2006-01-02") + key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) } - dateKey := time.Unix(mTime, 0).Format("2006-01-02") - return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) + + return escapeKey(key) } diff --git a/weed/command/filer_sync_std.go b/weed/command/filer_sync_std.go new file mode 100644 index 000000000..63851eaf8 --- /dev/null +++ b/weed/command/filer_sync_std.go @@ -0,0 +1,7 @@ +// +build !windows + +package command + +func escapeKey(key string) string { + return key +} diff --git a/weed/command/filer_sync_windows.go b/weed/command/filer_sync_windows.go new file mode 100644 index 000000000..3d0c9146e --- /dev/null +++ b/weed/command/filer_sync_windows.go @@ -0,0 +1,12 @@ +package command + +import ( + "strings" +) + +func escapeKey(key string) string { + if strings.Contains(key, ":") { + return strings.ReplaceAll(key, ":", "") + } + return key +} diff --git a/weed/command/fuse.go b/weed/command/fuse.go index 0a55e509c..a0dcaa86c 100644 --- a/weed/command/fuse.go +++ b/weed/command/fuse.go @@ -2,140 +2,232 @@ package command import ( "fmt" - "strings" + "os" "strconv" + "strings" "time" - "os" ) func init() { cmdFuse.Run = runFuse // break init cycle } +type parameter struct { + name string + value string +} + func runFuse(cmd *Command, args []string) bool { - argsLen := len(args) - options := []string{} + rawArgs := strings.Join(args, " ") + rawArgsLen := len(rawArgs) + option := strings.Builder{} + options := []parameter{} + masterProcess := true + fusermountPath := "" + + // first parameter + i := 0 + for i = 0; i < rawArgsLen && rawArgs[i] != ' '; i++ { + option.WriteByte(rawArgs[i]) + } + options = append(options, parameter{"arg0", option.String()}) + option.Reset() + + for i++; i < rawArgsLen; i++ { + + // space separator check for filled option + if rawArgs[i] == ' ' { + if option.Len() > 0 { + options = append(options, parameter{option.String(), "true"}) + option.Reset() + } - // at least target mount path should be passed - if argsLen < 1 { - return false + // dash separator read option until next space + } else if rawArgs[i] == '-' { + for i++; i < rawArgsLen && rawArgs[i] != ' '; i++ { + option.WriteByte(rawArgs[i]) + } + options = append(options, parameter{option.String(), "true"}) + option.Reset() + + // equal separator start option with pending value + } else if rawArgs[i] == '=' { + name := option.String() + option.Reset() + + for i++; i < rawArgsLen && rawArgs[i] != ',' && rawArgs[i] != ' '; i++ { + // double quote separator read option until next double quote + if rawArgs[i] == '"' { + for i++; i < rawArgsLen && rawArgs[i] != '"'; i++ { + option.WriteByte(rawArgs[i]) + } + + // single quote separator read option until next single quote + } else if rawArgs[i] == '\'' { + for i++; i < rawArgsLen && rawArgs[i] != '\''; i++ { + option.WriteByte(rawArgs[i]) + } + + // add chars before comma + } else if rawArgs[i] != ' ' { + option.WriteByte(rawArgs[i]) + } + } + + options = append(options, parameter{name, option.String()}) + option.Reset() + + // comma separator just read current option + } else if rawArgs[i] == ',' { + options = append(options, parameter{option.String(), "true"}) + option.Reset() + + // what is not a separator fill option buffer + } else { + option.WriteByte(rawArgs[i]) + } } - // first option is always target mount path - mountOptions.dir = &args[0] + // get residual option data + if option.Len() > 0 { + // add value to pending option + options = append(options, parameter{option.String(), "true"}) + option.Reset() + } - // scan parameters looking for one or more -o options - // -o options receive parameters on format key=value[,key=value]... - for i := 0; i < argsLen; i++ { - if args[i] == "-o" && i+1 <= argsLen { - options = strings.Split(args[i+1], ",") + // scan each parameter + for i := 0; i < len(options); i++ { + parameter := options[i] + + switch parameter.name { + case "child": + masterProcess = false + case "arg0": + mountOptions.dir = ¶meter.value + case "filer": + mountOptions.filer = ¶meter.value + case "filer.path": + mountOptions.filerMountRootPath = ¶meter.value + case "dirAutoCreate": + if parsed, err := strconv.ParseBool(parameter.value); err == nil { + mountOptions.dirAutoCreate = &parsed + } else { + panic(fmt.Errorf("dirAutoCreate: %s", err)) + } + case "collection": + mountOptions.collection = ¶meter.value + case "replication": + mountOptions.replication = ¶meter.value + case "disk": + mountOptions.diskType = ¶meter.value + case "ttl": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.ttlSec = &intValue + } else { + panic(fmt.Errorf("ttl: %s", err)) + } + case "chunkSizeLimitMB": + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.chunkSizeLimitMB = &intValue + } else { + panic(fmt.Errorf("chunkSizeLimitMB: %s", err)) + } + case "concurrentWriters": i++ + if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil { + intValue := int(parsed) + mountOptions.concurrentWriters = &intValue + } else { + panic(fmt.Errorf("concurrentWriters: %s", err)) + } + case "cacheDir": + mountOptions.cacheDir = ¶meter.value + case "cacheCapacityMB": + if parsed, err := strconv.ParseInt(parameter.value, 0, 64); err == nil { + mountOptions.cacheSizeMB = &parsed + } else { + panic(fmt.Errorf("cacheCapacityMB: %s", err)) + } + case "dataCenter": + mountOptions.dataCenter = ¶meter.value + case "allowOthers": + if parsed, err := strconv.ParseBool(parameter.value); err == nil { + mountOptions.allowOthers = &parsed + } else { + panic(fmt.Errorf("allowOthers: %s", err)) + } + case "umask": + mountOptions.umaskString = ¶meter.value + case "nonempty": + if parsed, err := strconv.ParseBool(parameter.value); err == nil { + mountOptions.nonempty = &parsed + } else { + panic(fmt.Errorf("nonempty: %s", err)) + } + case "volumeServerAccess": + mountOptions.volumeServerAccess = ¶meter.value + case "map.uid": + mountOptions.uidMap = ¶meter.value + case "map.gid": + mountOptions.gidMap = ¶meter.value + case "readOnly": + if parsed, err := strconv.ParseBool(parameter.value); err == nil { + mountOptions.readOnly = &parsed + } else { + panic(fmt.Errorf("readOnly: %s", err)) + } + case "cpuprofile": + mountCpuProfile = ¶meter.value + case "memprofile": + mountMemProfile = ¶meter.value + case "readRetryTime": + if parsed, err := time.ParseDuration(parameter.value); err == nil { + mountReadRetryTime = &parsed + } else { + panic(fmt.Errorf("readRetryTime: %s", err)) + } + case "fusermount.path": + fusermountPath = parameter.value } } - // for each option passed with -o - for _, option := range options { - // split just first = character - parts := strings.SplitN(option, "=", 2) + // the master start the child, release it then finish himself + if masterProcess { + arg0, err := os.Executable() + if err != nil { + panic(err) + } + + argv := append(os.Args, "-o", "child") + + attr := os.ProcAttr{} + attr.Env = os.Environ() + + child, err := os.StartProcess(arg0, argv, &attr) - // if doesn't key and value skip - if len(parts) != 2 { - continue + if err != nil { + panic(fmt.Errorf("master process can not start child process: %s", err)) } - key, value := parts[0], parts[1] - - // switch key keeping "weed mount" parameters - switch key { - case "filer": - mountOptions.filer = &value - case "filer.path": - mountOptions.filerMountRootPath = &value - case "dirAutoCreate": - if parsed, err := strconv.ParseBool(value); err != nil { - mountOptions.dirAutoCreate = &parsed - } else { - panic(fmt.Errorf("dirAutoCreate: %s", err)) - } - case "collection": - mountOptions.collection = &value - case "replication": - mountOptions.replication = &value - case "disk": - mountOptions.diskType = &value - case "ttl": - if parsed, err := strconv.ParseInt(value, 0, 32); err != nil { - intValue := int(parsed) - mountOptions.ttlSec = &intValue - } else { - panic(fmt.Errorf("ttl: %s", err)) - } - case "chunkSizeLimitMB": - if parsed, err := strconv.ParseInt(value, 0, 32); err != nil { - intValue := int(parsed) - mountOptions.chunkSizeLimitMB = &intValue - } else { - panic(fmt.Errorf("chunkSizeLimitMB: %s", err)) - } - case "concurrentWriters": - if parsed, err := strconv.ParseInt(value, 0, 32); err != nil { - intValue := int(parsed) - mountOptions.concurrentWriters = &intValue - } else { - panic(fmt.Errorf("concurrentWriters: %s", err)) - } - case "cacheDir": - mountOptions.cacheDir = &value - case "cacheCapacityMB": - if parsed, err := strconv.ParseInt(value, 0, 64); err != nil { - mountOptions.cacheSizeMB = &parsed - } else { - panic(fmt.Errorf("cacheCapacityMB: %s", err)) - } - case "dataCenter": - mountOptions.dataCenter = &value - case "allowOthers": - if parsed, err := strconv.ParseBool(value); err != nil { - mountOptions.allowOthers = &parsed - } else { - panic(fmt.Errorf("allowOthers: %s", err)) - } - case "umask": - mountOptions.umaskString = &value - case "nonempty": - if parsed, err := strconv.ParseBool(value); err != nil { - mountOptions.nonempty = &parsed - } else { - panic(fmt.Errorf("nonempty: %s", err)) - } - case "volumeServerAccess": - mountOptions.volumeServerAccess = &value - case "map.uid": - mountOptions.uidMap = &value - case "map.gid": - mountOptions.gidMap = &value - case "readOnly": - if parsed, err := strconv.ParseBool(value); err != nil { - mountOptions.readOnly = &parsed - } else { - panic(fmt.Errorf("readOnly: %s", err)) - } - case "cpuprofile": - mountCpuProfile = &value - case "memprofile": - mountMemProfile = &value - case "readRetryTime": - if parsed, err := time.ParseDuration(value); err != nil { - mountReadRetryTime = &parsed - } else { - panic(fmt.Errorf("readRetryTime: %s", err)) - } + err = child.Release() + + if err != nil { + panic(fmt.Errorf("master process can not release child process: %s", err)) } + + return true } - // I don't know why PATH environment variable is lost - if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin"); err != nil { - panic(fmt.Errorf("setenv: %s", err)) + if fusermountPath != "" { + if err := os.Setenv("PATH", fusermountPath); err != nil { + panic(fmt.Errorf("setenv: %s", err)) + } + } else if os.Getenv("PATH") == "" { + if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin"); err != nil { + panic(fmt.Errorf("setenv: %s", err)) + } } // just call "weed mount" command @@ -144,7 +236,7 @@ func runFuse(cmd *Command, args []string) bool { var cmdFuse = &Command{ UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"", - Short: "Allow use weed with linux's mount command", + Short: "Allow use weed with linux's mount command", Long: `Allow use weed with linux's mount command You can use -t weed on mount command: @@ -160,6 +252,9 @@ var cmdFuse = &Command{ mount -t fuse./home/user/bin/weed fuse /mnt -o "filer=localhost:8888,filer.path=/" mount -t fuse "/home/user/bin/weed#fuse" /mnt -o "filer=localhost:8888,filer.path=/" + To pass more than one parameter use quotes, example: + mount -t weed fuse /mnt -o "filer='192.168.0.1:8888,192.168.0.2:8888',filer.path=/" + To check valid options look "weed mount --help" `, } diff --git a/weed/command/iam.go b/weed/command/iam.go index 17d0832cb..ed4eea543 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -49,6 +49,7 @@ func (iamopt *IamOptions) startIamServer() bool { return false } + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/command/imports.go b/weed/command/imports.go new file mode 100644 index 000000000..ce0bf0e10 --- /dev/null +++ b/weed/command/imports.go @@ -0,0 +1,30 @@ +package command + +import ( + _ "net/http/pprof" + + _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" + + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" + _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" + + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" +) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index e72a2f2cf..cdf340067 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -5,15 +5,18 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "os/user" "path" + "path/filepath" "runtime" "strconv" "strings" + "syscall" "time" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/seaweedfs/fuse" @@ -49,6 +52,21 @@ func runMount(cmd *Command, args []string) bool { return RunMount(&mountOptions, os.FileMode(umask)) } +func getParentInode(mountDir string) (uint64, error) { + parentDir := filepath.Clean(filepath.Join(mountDir, "..")) + fi, err := os.Stat(parentDir) + if err != nil { + return 0, err + } + + stat, ok := fi.Sys().(*syscall.Stat_t) + if !ok { + return 0, nil + } + + return stat.Ino, nil +} + func RunMount(option *MountOptions, umask os.FileMode) bool { filers := strings.Split(*option.filer, ",") @@ -85,13 +103,19 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { filerMountRootPath := *option.filerMountRootPath dir := util.ResolvePath(*option.dir) - chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB + parentInode, err := getParentInode(dir) + if err != nil { + glog.Errorf("failed to retrieve inode for parent directory of %s: %v", dir, err) + return true + } fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") return false } + + chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB if chunkSizeLimitMB <= 0 { fmt.Printf("Please specify a reasonable buffer size.") return false @@ -199,6 +223,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { MountMode: mountMode, MountCtime: fileInfo.ModTime(), MountMtime: time.Now(), + MountParentInode: parentInode, Umask: umask, VolumeServerAccess: *mountOptions.volumeServerAccess, Cipher: cipher, @@ -221,6 +246,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir) server := fs.New(c, nil) seaweedFileSystem.Server = server + seaweedFileSystem.StartBackgroundTasks() err = server.Serve(seaweedFileSystem) // check if the mount process has an error to report diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 2d6729bd3..886c0ac5e 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -1,6 +1,8 @@ package command import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/command/scaffold" "io/ioutil" "path/filepath" ) @@ -35,17 +37,17 @@ func runScaffold(cmd *Command, args []string) bool { content := "" switch *config { case "filer": - content = FILER_TOML_EXAMPLE + content = scaffold.Filer case "notification": - content = NOTIFICATION_TOML_EXAMPLE + content = scaffold.Notification case "replication": - content = REPLICATION_TOML_EXAMPLE + content = scaffold.Replication case "security": - content = SECURITY_TOML_EXAMPLE + content = scaffold.Security case "master": - content = MASTER_TOML_EXAMPLE + content = scaffold.Master case "shell": - content = SHELL_TOML_EXAMPLE + content = scaffold.Shell } if content == "" { println("need a valid -config option") @@ -55,519 +57,7 @@ func runScaffold(cmd *Command, args []string) bool { if *outputPath != "" { ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644) } else { - println(content) + fmt.Println(content) } return true } - -const ( - FILER_TOML_EXAMPLE = ` -# A sample TOML config file for SeaweedFS filer store -# Used with "weed filer" or "weed server -filer" -# Put this file to one of the location, with descending priority -# ./filer.toml -# $HOME/.seaweedfs/filer.toml -# /etc/seaweedfs/filer.toml - -#################################################### -# Customizable filer server options -#################################################### -[filer.options] -# with http DELETE, by default the filer would check whether a folder is empty. -# recursive_delete will delete all sub folders and files, similar to "rm -Rf" -recursive_delete = false -# directories under this folder will be automatically creating a separate bucket -buckets_folder = "/buckets" - -#################################################### -# The following are filer store options -#################################################### - -[leveldb2] -# local on disk, mostly for simple single-machine setup, fairly scalable -# faster than previous leveldb, recommended. -enabled = true -dir = "./filerldb2" # directory to store level db files - -[leveldb3] -# similar to leveldb2. -# each bucket has its own meta store. -enabled = false -dir = "./filerldb3" # directory to store level db files - -[rocksdb] -# local on disk, similar to leveldb -# since it is using a C wrapper, you need to install rocksdb and build it by yourself -enabled = false -dir = "./filerrdb" # directory to store rocksdb files - -[sqlite] -# local on disk, similar to leveldb -enabled = false -dbFile = "./filer.db" # sqlite db file - -[mysql] # or memsql, tidb -# CREATE TABLE IF NOT EXISTS filemeta ( -# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', -# name VARCHAR(1000) BINARY COMMENT 'directory or file name', -# directory TEXT COMMENT 'full path to parent directory', -# meta LONGBLOB, -# PRIMARY KEY (dirhash, name) -# ) DEFAULT CHARSET=utf8; - -enabled = false -hostname = "localhost" -port = 3306 -username = "root" -password = "" -database = "" # create or use an existing database -connection_max_idle = 2 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -interpolateParams = false -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" - -[mysql2] # or memsql, tidb -enabled = false -createTable = """ - CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` ( - dirhash BIGINT, - name VARCHAR(1000) BINARY, - directory TEXT, - meta LONGBLOB, - PRIMARY KEY (dirhash, name) - ) DEFAULT CHARSET=utf8; -""" -hostname = "localhost" -port = 3306 -username = "root" -password = "" -database = "" # create or use an existing database -connection_max_idle = 2 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -interpolateParams = false -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" - -[postgres] # or cockroachdb, YugabyteDB -# CREATE TABLE IF NOT EXISTS filemeta ( -# dirhash BIGINT, -# name VARCHAR(65535), -# directory VARCHAR(65535), -# meta bytea, -# PRIMARY KEY (dirhash, name) -# ); -enabled = false -hostname = "localhost" -port = 5432 -username = "postgres" -password = "" -database = "postgres" # create or use an existing database -schema = "" -sslmode = "disable" -connection_max_idle = 100 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" - -[postgres2] -enabled = false -createTable = """ - CREATE TABLE IF NOT EXISTS "%s" ( - dirhash BIGINT, - name VARCHAR(65535), - directory VARCHAR(65535), - meta bytea, - PRIMARY KEY (dirhash, name) - ); -""" -hostname = "localhost" -port = 5432 -username = "postgres" -password = "" -database = "postgres" # create or use an existing database -schema = "" -sslmode = "disable" -connection_max_idle = 100 -connection_max_open = 100 -connection_max_lifetime_seconds = 0 -# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: -enableUpsert = true -upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" - -[cassandra] -# CREATE TABLE filemeta ( -# directory varchar, -# name varchar, -# meta blob, -# PRIMARY KEY (directory, name) -# ) WITH CLUSTERING ORDER BY (name ASC); -enabled = false -keyspace="seaweedfs" -hosts=[ - "localhost:9042", -] -username="" -password="" -# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. -superLargeDirectories = [] - -[hbase] -enabled = false -zkquorum = "" -table = "seaweedfs" - -[redis2] -enabled = false -address = "localhost:6379" -password = "" -database = 0 -# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. -superLargeDirectories = [] - -[redis_cluster2] -enabled = false -addresses = [ - "localhost:30001", - "localhost:30002", - "localhost:30003", - "localhost:30004", - "localhost:30005", - "localhost:30006", -] -password = "" -# allows reads from slave servers or the master, but all writes still go to the master -readOnly = false -# automatically use the closest Redis server for reads -routeByLatency = false -# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. -superLargeDirectories = [] - -[etcd] -enabled = false -servers = "localhost:2379" -timeout = "3s" - -[mongodb] -enabled = false -uri = "mongodb://localhost:27017" -option_pool_size = 0 -database = "seaweedfs" - -[elastic7] -enabled = false -servers = [ - "http://localhost1:9200", - "http://localhost2:9200", - "http://localhost3:9200", -] -username = "" -password = "" -sniff_enabled = false -healthcheck_enabled = false -# increase the value is recommend, be sure the value in Elastic is greater or equal here -index.max_result_window = 10000 - - - -########################## -########################## -# To add path-specific filer store: -# -# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp -# 2. Add a location configuraiton. E.g., location = "/tmp/" -# 3. Copy and customize all other configurations. -# Make sure they are not the same if using the same store type! -# 4. Set enabled to true -# -# The following is just using redis as an example -########################## -[redis2.tmp] -enabled = false -location = "/tmp/" -address = "localhost:6379" -password = "" -database = 1 - -` - - NOTIFICATION_TOML_EXAMPLE = ` -# A sample TOML config file for SeaweedFS filer store -# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate" -# Put this file to one of the location, with descending priority -# ./notification.toml -# $HOME/.seaweedfs/notification.toml -# /etc/seaweedfs/notification.toml - -#################################################### -# notification -# send and receive filer updates for each file to an external message queue -#################################################### -[notification.log] -# this is only for debugging perpose and does not work with "weed filer.replicate" -enabled = false - - -[notification.kafka] -enabled = false -hosts = [ - "localhost:9092" -] -topic = "seaweedfs_filer" -offsetFile = "./last.offset" -offsetSaveIntervalSeconds = 10 - - -[notification.aws_sqs] -# experimental, let me know if it works -enabled = false -aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -region = "us-east-2" -sqs_queue_name = "my_filer_queue" # an existing queue name - - -[notification.google_pub_sub] -# read credentials doc at https://cloud.google.com/docs/authentication/getting-started -enabled = false -google_application_credentials = "/path/to/x.json" # path to json credential file -project_id = "" # an existing project id -topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists - -[notification.gocdk_pub_sub] -# The Go Cloud Development Kit (https://gocloud.dev). -# PubSub API (https://godoc.org/gocloud.dev/pubsub). -# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. -enabled = false -# This URL will Dial the RabbitMQ server at the URL in the environment -# variable RABBIT_SERVER_URL and open the exchange "myexchange". -# The exchange must have already been created by some other means, like -# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then -# create binding myexchange => myqueue -topic_url = "rabbit://myexchange" -sub_url = "rabbit://myqueue" -` - - REPLICATION_TOML_EXAMPLE = ` -# A sample TOML config file for replicating SeaweedFS filer -# Used with "weed filer.backup" -# Using with "weed filer.replicate" is deprecated. -# Put this file to one of the location, with descending priority -# ./replication.toml -# $HOME/.seaweedfs/replication.toml -# /etc/seaweedfs/replication.toml - -[source.filer] # deprecated. Only useful with "weed filer.replicate" -enabled = true -grpcAddress = "localhost:18888" -# all files under this directory tree are replicated. -# this is not a directory on your hard drive, but on your filer. -# i.e., all files with this "prefix" are sent to notification message queue. -directory = "/buckets" - -[sink.local] -enabled = false -directory = "/data" -# all replicated files are under modified time as yyyy-mm-dd directories -# so each date directory contains all new and updated files. -is_incremental = false - -[sink.filer] -enabled = false -grpcAddress = "localhost:18888" -# all replicated files are under this directory tree -# this is not a directory on your hard drive, but on your filer. -# i.e., all received files will be "prefixed" to this directory. -directory = "/backup" -replication = "" -collection = "" -ttlSec = 0 -is_incremental = false - -[sink.s3] -# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html -# default loads credentials from the shared credentials file (~/.aws/credentials). -enabled = false -aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). -region = "us-east-2" -bucket = "your_bucket_name" # an existing bucket -directory = "/" # destination directory -endpoint = "" -is_incremental = false - -[sink.google_cloud_storage] -# read credentials doc at https://cloud.google.com/docs/authentication/getting-started -enabled = false -google_application_credentials = "/path/to/x.json" # path to json credential file -bucket = "your_bucket_seaweedfs" # an existing bucket -directory = "/" # destination directory -is_incremental = false - -[sink.azure] -# experimental, let me know if it works -enabled = false -account_name = "" -account_key = "" -container = "mycontainer" # an existing container -directory = "/" # destination directory -is_incremental = false - -[sink.backblaze] -enabled = false -b2_account_id = "" -b2_master_application_key = "" -bucket = "mybucket" # an existing bucket -directory = "/" # destination directory -is_incremental = false - -` - - SECURITY_TOML_EXAMPLE = ` -# Put this file to one of the location, with descending priority -# ./security.toml -# $HOME/.seaweedfs/security.toml -# /etc/seaweedfs/security.toml -# this file is read by master, volume server, and filer - -# the jwt signing key is read by master and volume server. -# a jwt defaults to expire after 10 seconds. -[jwt.signing] -key = "" -expires_after_seconds = 10 # seconds - -# jwt for read is only supported with master+volume setup. Filer does not support this mode. -[jwt.signing.read] -key = "" -expires_after_seconds = 10 # seconds - -# all grpc tls authentications are mutual -# the values for the following ca, cert, and key are paths to the PERM files. -# the host name is not checked, so the PERM files can be shared. -[grpc] -ca = "" -# Set wildcard domain for enable TLS authentication by common names -allowed_wildcard_domain = "" # .mycompany.com - -[grpc.volume] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -[grpc.master] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -[grpc.filer] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -[grpc.msg_broker] -cert = "" -key = "" -allowed_commonNames = "" # comma-separated SSL certificate common names - -# use this for any place needs a grpc client -# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" -[grpc.client] -cert = "" -key = "" - -# volume server https options -# Note: work in progress! -# this does not work with other clients, e.g., "weed filer|mount" etc, yet. -[https.client] -enabled = true -[https.volume] -cert = "" -key = "" - - -` - - MASTER_TOML_EXAMPLE = ` -# Put this file to one of the location, with descending priority -# ./master.toml -# $HOME/.seaweedfs/master.toml -# /etc/seaweedfs/master.toml -# this file is read by master - -[master.maintenance] -# periodically run these scripts are the same as running them from 'weed shell' -scripts = """ - lock - ec.encode -fullPercent=95 -quietFor=1h - ec.rebuild -force - ec.balance -force - volume.balance -force - volume.fix.replication - unlock -""" -sleep_minutes = 17 # sleep minutes between each script execution - -[master.filer] -default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands - - -[master.sequencer] -type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence -# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence -# example : http://127.0.0.1:2379,http://127.0.0.1:2389 -sequencer_etcd_urls = "http://127.0.0.1:2379" - - -# configurations for tiered cloud storage -# old volumes are transparently moved to cloud for cost efficiency -[storage.backend] - [storage.backend.s3.default] - enabled = false - aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). - aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). - region = "us-east-2" - bucket = "your_bucket_name" # an existing bucket - endpoint = "" - -# create this number of logical volumes if no more writable volumes -# count_x means how many copies of data. -# e.g.: -# 000 has only one copy, copy_1 -# 010 and 001 has two copies, copy_2 -# 011 has only 3 copies, copy_3 -[master.volume_growth] -copy_1 = 7 # create 1 x 7 = 7 actual volumes -copy_2 = 6 # create 2 x 6 = 12 actual volumes -copy_3 = 3 # create 3 x 3 = 9 actual volumes -copy_other = 1 # create n x 1 = n actual volumes - -# configuration flags for replication -[master.replication] -# any replication counts should be considered minimums. If you specify 010 and -# have 3 different racks, that's still considered writable. Writes will still -# try to replicate to all available volumes. You should only use this option -# if you are doing your own replication or periodic sync of volumes. -treat_replication_as_minimums = false - -` - SHELL_TOML_EXAMPLE = ` - -[cluster] -default = "c1" - -[cluster.c1] -master = "localhost:9333" # comma-separated master servers -filer = "localhost:8888" # filer host and port - -[cluster.c2] -master = "" -filer = "" - -` -) diff --git a/weed/command/scaffold/example.go b/weed/command/scaffold/example.go new file mode 100644 index 000000000..6be6804e5 --- /dev/null +++ b/weed/command/scaffold/example.go @@ -0,0 +1,21 @@ +package scaffold + +import _ "embed" + +//go:embed filer.toml +var Filer string + +//go:embed notification.toml +var Notification string + +//go:embed replication.toml +var Replication string + +//go:embed security.toml +var Security string + +//go:embed master.toml +var Master string + +//go:embed shell.toml +var Shell string diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml new file mode 100644 index 000000000..9e9258865 --- /dev/null +++ b/weed/command/scaffold/filer.toml @@ -0,0 +1,232 @@ +# A sample TOML config file for SeaweedFS filer store +# Used with "weed filer" or "weed server -filer" +# Put this file to one of the location, with descending priority +# ./filer.toml +# $HOME/.seaweedfs/filer.toml +# /etc/seaweedfs/filer.toml + +#################################################### +# Customizable filer server options +#################################################### +[filer.options] +# with http DELETE, by default the filer would check whether a folder is empty. +# recursive_delete will delete all sub folders and files, similar to "rm -Rf" +recursive_delete = false + +#################################################### +# The following are filer store options +#################################################### + +[leveldb2] +# local on disk, mostly for simple single-machine setup, fairly scalable +# faster than previous leveldb, recommended. +enabled = true +dir = "./filerldb2" # directory to store level db files + +[leveldb3] +# similar to leveldb2. +# each bucket has its own meta store. +enabled = false +dir = "./filerldb3" # directory to store level db files + +[rocksdb] +# local on disk, similar to leveldb +# since it is using a C wrapper, you need to install rocksdb and build it by yourself +enabled = false +dir = "./filerrdb" # directory to store rocksdb files + +[sqlite] +# local on disk, similar to leveldb +enabled = false +dbFile = "./filer.db" # sqlite db file + +[mysql] # or memsql, tidb +# CREATE TABLE IF NOT EXISTS filemeta ( +# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', +# name VARCHAR(1000) BINARY COMMENT 'directory or file name', +# directory TEXT COMMENT 'full path to parent directory', +# meta LONGBLOB, +# PRIMARY KEY (dirhash, name) +# ) DEFAULT CHARSET=utf8; + +enabled = false +hostname = "localhost" +port = 3306 +username = "root" +password = "" +database = "" # create or use an existing database +connection_max_idle = 2 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" + +[mysql2] # or memsql, tidb +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` ( + dirhash BIGINT, + name VARCHAR(1000) BINARY, + directory TEXT, + meta LONGBLOB, + PRIMARY KEY (dirhash, name) + ) DEFAULT CHARSET=utf8; +""" +hostname = "localhost" +port = 3306 +username = "root" +password = "" +database = "" # create or use an existing database +connection_max_idle = 2 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +interpolateParams = false +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" + +[postgres] # or cockroachdb, YugabyteDB +# CREATE TABLE IF NOT EXISTS filemeta ( +# dirhash BIGINT, +# name VARCHAR(65535), +# directory VARCHAR(65535), +# meta bytea, +# PRIMARY KEY (dirhash, name) +# ); +enabled = false +hostname = "localhost" +port = 5432 +username = "postgres" +password = "" +database = "postgres" # create or use an existing database +schema = "" +sslmode = "disable" +connection_max_idle = 100 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" + +[postgres2] +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS "%s" ( + dirhash BIGINT, + name VARCHAR(65535), + directory VARCHAR(65535), + meta bytea, + PRIMARY KEY (dirhash, name) + ); +""" +hostname = "localhost" +port = 5432 +username = "postgres" +password = "" +database = "postgres" # create or use an existing database +schema = "" +sslmode = "disable" +connection_max_idle = 100 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 +# if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: +enableUpsert = true +upsertQuery = """INSERT INTO "%[1]s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4) ON CONFLICT (dirhash,name) DO UPDATE SET meta = EXCLUDED.meta WHERE "%[1]s".meta != EXCLUDED.meta""" + +[cassandra] +# CREATE TABLE filemeta ( +# directory varchar, +# name varchar, +# meta blob, +# PRIMARY KEY (directory, name) +# ) WITH CLUSTERING ORDER BY (name ASC); +enabled = false +keyspace = "seaweedfs" +hosts = [ + "localhost:9042", +] +username = "" +password = "" +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] +# Name of the datacenter local to this filer, used as host selection fallback. +localDC = "" + +[hbase] +enabled = false +zkquorum = "" +table = "seaweedfs" + +[redis2] +enabled = false +address = "localhost:6379" +password = "" +database = 0 +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[redis_cluster2] +enabled = false +addresses = [ + "localhost:30001", + "localhost:30002", + "localhost:30003", + "localhost:30004", + "localhost:30005", + "localhost:30006", +] +password = "" +# allows reads from slave servers or the master, but all writes still go to the master +readOnly = false +# automatically use the closest Redis server for reads +routeByLatency = false +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[etcd] +enabled = false +servers = "localhost:2379" +timeout = "3s" + +[mongodb] +enabled = false +uri = "mongodb://localhost:27017" +option_pool_size = 0 +database = "seaweedfs" + +[elastic7] +enabled = false +servers = [ + "http://localhost1:9200", + "http://localhost2:9200", + "http://localhost3:9200", +] +username = "" +password = "" +sniff_enabled = false +healthcheck_enabled = false +# increase the value is recommend, be sure the value in Elastic is greater or equal here +index.max_result_window = 10000 + + + +########################## +########################## +# To add path-specific filer store: +# +# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp +# 2. Add a location configuraiton. E.g., location = "/tmp/" +# 3. Copy and customize all other configurations. +# Make sure they are not the same if using the same store type! +# 4. Set enabled to true +# +# The following is just using redis as an example +########################## +[redis2.tmp] +enabled = false +location = "/tmp/" +address = "localhost:6379" +password = "" +database = 1 diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml new file mode 100644 index 000000000..020f48e36 --- /dev/null +++ b/weed/command/scaffold/master.toml @@ -0,0 +1,63 @@ +# Put this file to one of the location, with descending priority +# ./master.toml +# $HOME/.seaweedfs/master.toml +# /etc/seaweedfs/master.toml +# this file is read by master + +[master.maintenance] +# periodically run these scripts are the same as running them from 'weed shell' +scripts = """ + lock + ec.encode -fullPercent=95 -quietFor=1h + ec.rebuild -force + ec.balance -force + volume.deleteEmpty -quietFor=24h -force + volume.balance -force + volume.fix.replication + unlock +""" +sleep_minutes = 17 # sleep minutes between each script execution + +[master.filer] +default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands + + +[master.sequencer] +type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence +# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence +# example : http://127.0.0.1:2379,http://127.0.0.1:2389 +sequencer_etcd_urls = "http://127.0.0.1:2379" +# when sequencer.type = snowflake, the snowflake id must be different from other masters +sequencer_snowflake_id = 0 # any number between 1~1023 + + +# configurations for tiered cloud storage +# old volumes are transparently moved to cloud for cost efficiency +[storage.backend] +[storage.backend.s3.default] +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +bucket = "your_bucket_name" # an existing bucket +endpoint = "" + +# create this number of logical volumes if no more writable volumes +# count_x means how many copies of data. +# e.g.: +# 000 has only one copy, copy_1 +# 010 and 001 has two copies, copy_2 +# 011 has only 3 copies, copy_3 +[master.volume_growth] +copy_1 = 7 # create 1 x 7 = 7 actual volumes +copy_2 = 6 # create 2 x 6 = 12 actual volumes +copy_3 = 3 # create 3 x 3 = 9 actual volumes +copy_other = 1 # create n x 1 = n actual volumes + +# configuration flags for replication +[master.replication] +# any replication counts should be considered minimums. If you specify 010 and +# have 3 different racks, that's still considered writable. Writes will still +# try to replicate to all available volumes. You should only use this option +# if you are doing your own replication or periodic sync of volumes. +treat_replication_as_minimums = false diff --git a/weed/command/scaffold/notification.toml b/weed/command/scaffold/notification.toml new file mode 100644 index 000000000..f35101edd --- /dev/null +++ b/weed/command/scaffold/notification.toml @@ -0,0 +1,54 @@ +# A sample TOML config file for SeaweedFS filer store +# Used by both "weed filer" or "weed server -filer" and "weed filer.replicate" +# Put this file to one of the location, with descending priority +# ./notification.toml +# $HOME/.seaweedfs/notification.toml +# /etc/seaweedfs/notification.toml + +#################################################### +# notification +# send and receive filer updates for each file to an external message queue +#################################################### +[notification.log] +# this is only for debugging perpose and does not work with "weed filer.replicate" +enabled = false + + +[notification.kafka] +enabled = false +hosts = [ + "localhost:9092" +] +topic = "seaweedfs_filer" +offsetFile = "./last.offset" +offsetSaveIntervalSeconds = 10 + + +[notification.aws_sqs] +# experimental, let me know if it works +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +sqs_queue_name = "my_filer_queue" # an existing queue name + + +[notification.google_pub_sub] +# read credentials doc at https://cloud.google.com/docs/authentication/getting-started +enabled = false +google_application_credentials = "/path/to/x.json" # path to json credential file +project_id = "" # an existing project id +topic = "seaweedfs_filer_topic" # a topic, auto created if does not exists + +[notification.gocdk_pub_sub] +# The Go Cloud Development Kit (https://gocloud.dev). +# PubSub API (https://godoc.org/gocloud.dev/pubsub). +# Supports AWS SNS/SQS, Azure Service Bus, Google PubSub, NATS and RabbitMQ. +enabled = false +# This URL will Dial the RabbitMQ server at the URL in the environment +# variable RABBIT_SERVER_URL and open the exchange "myexchange". +# The exchange must have already been created by some other means, like +# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then +# create binding myexchange => myqueue +topic_url = "rabbit://myexchange" +sub_url = "rabbit://myqueue" diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml new file mode 100644 index 000000000..c463c8077 --- /dev/null +++ b/weed/command/scaffold/replication.toml @@ -0,0 +1,71 @@ +# A sample TOML config file for replicating SeaweedFS filer +# Used with "weed filer.backup" +# Using with "weed filer.replicate" is deprecated. +# Put this file to one of the location, with descending priority +# ./replication.toml +# $HOME/.seaweedfs/replication.toml +# /etc/seaweedfs/replication.toml + +[source.filer] # deprecated. Only useful with "weed filer.replicate" +enabled = true +grpcAddress = "localhost:18888" +# all files under this directory tree are replicated. +# this is not a directory on your hard drive, but on your filer. +# i.e., all files with this "prefix" are sent to notification message queue. +directory = "/buckets" + +[sink.local] +enabled = false +directory = "/data" +# all replicated files are under modified time as yyyy-mm-dd directories +# so each date directory contains all new and updated files. +is_incremental = false + +[sink.filer] +enabled = false +grpcAddress = "localhost:18888" +# all replicated files are under this directory tree +# this is not a directory on your hard drive, but on your filer. +# i.e., all received files will be "prefixed" to this directory. +directory = "/backup" +replication = "" +collection = "" +ttlSec = 0 +is_incremental = false + +[sink.s3] +# read credentials doc at https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/sessions.html +# default loads credentials from the shared credentials file (~/.aws/credentials). +enabled = false +aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials). +region = "us-east-2" +bucket = "your_bucket_name" # an existing bucket +directory = "/" # destination directory +endpoint = "" +is_incremental = false + +[sink.google_cloud_storage] +# read credentials doc at https://cloud.google.com/docs/authentication/getting-started +enabled = false +google_application_credentials = "/path/to/x.json" # path to json credential file +bucket = "your_bucket_seaweedfs" # an existing bucket +directory = "/" # destination directory +is_incremental = false + +[sink.azure] +# experimental, let me know if it works +enabled = false +account_name = "" +account_key = "" +container = "mycontainer" # an existing container +directory = "/" # destination directory +is_incremental = false + +[sink.backblaze] +enabled = false +b2_account_id = "" +b2_master_application_key = "" +bucket = "mybucket" # an existing bucket +directory = "/" # destination directory +is_incremental = false diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml new file mode 100644 index 000000000..0c69b2f24 --- /dev/null +++ b/weed/command/scaffold/security.toml @@ -0,0 +1,60 @@ +# Put this file to one of the location, with descending priority +# ./security.toml +# $HOME/.seaweedfs/security.toml +# /etc/seaweedfs/security.toml +# this file is read by master, volume server, and filer + +# the jwt signing key is read by master and volume server. +# a jwt defaults to expire after 10 seconds. +[jwt.signing] +key = "" +expires_after_seconds = 10 # seconds + +# jwt for read is only supported with master+volume setup. Filer does not support this mode. +[jwt.signing.read] +key = "" +expires_after_seconds = 10 # seconds + +# all grpc tls authentications are mutual +# the values for the following ca, cert, and key are paths to the PERM files. +# the host name is not checked, so the PERM files can be shared. +[grpc] +ca = "" +# Set wildcard domain for enable TLS authentication by common names +allowed_wildcard_domain = "" # .mycompany.com + +[grpc.volume] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +[grpc.master] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +[grpc.filer] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +[grpc.msg_broker] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + +# use this for any place needs a grpc client +# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload" +[grpc.client] +cert = "" +key = "" + +# volume server https options +# Note: work in progress! +# this does not work with other clients, e.g., "weed filer|mount" etc, yet. +[https.client] +enabled = true +[https.volume] +cert = "" +key = "" + diff --git a/weed/command/scaffold/shell.toml b/weed/command/scaffold/shell.toml new file mode 100644 index 000000000..288ae2efe --- /dev/null +++ b/weed/command/scaffold/shell.toml @@ -0,0 +1,10 @@ +[cluster] +default = "c1" + +[cluster.c1] +master = "localhost:9333" # comma-separated master servers +filer = "localhost:8888" # filer host and port + +[cluster.c2] +master = "" +filer = "" diff --git a/weed/command/server.go b/weed/command/server.go index d0020d33b..fe10b24f7 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -3,6 +3,7 @@ package command import ( "fmt" "github.com/chrislusf/seaweedfs/weed/util/grace" + "net/http" "os" "strings" "time" @@ -16,6 +17,8 @@ import ( type ServerOptions struct { cpuprofile *string memprofile *string + debug *bool + debugPort *int v VolumeServerOptions } @@ -78,6 +81,8 @@ var ( func init() { serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") serverOptions.memprofile = cmdServer.Flag.String("memprofile", "", "memory profile output file") + serverOptions.debug = cmdServer.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:6060/debug/pprof/goroutine?debug=2") + serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging") masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") @@ -107,10 +112,11 @@ func init() { serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") - serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") + serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") @@ -139,6 +145,10 @@ func init() { func runServer(cmd *Command, args []string) bool { + if *serverOptions.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil) + } + util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) @@ -245,7 +255,7 @@ func runServer(cmd *Command, args []string) bool { // start volume server if *isStartingVolumeServer { - minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent) + minFreeSpaces := util.MustParseMinFreeSpace(*volumeMinFreeSpace, *volumeMinFreeSpacePercent) go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, minFreeSpaces) } diff --git a/weed/command/shell.go b/weed/command/shell.go index c9976e809..4a9f4b027 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -55,6 +55,7 @@ func runShell(command *Command, args []string) bool { var err error shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) + shellOptions.FilerAddress = *shellInitialFiler if err != nil { fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err) return false diff --git a/weed/command/upload.go b/weed/command/upload.go index 0f9361b40..9ae1befab 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -71,20 +71,20 @@ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master) + defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master) if err != nil { fmt.Printf("upload: %v", err) return false } if *upload.replication == "" { - *upload.replication = defaultCollection + *upload.replication = defaultReplication } if len(args) == 0 { if *upload.dir == "" { return false } - filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error { + err = filepath.Walk(util.ResolvePath(*upload.dir), func(path string, info os.FileInfo, err error) error { if err == nil { if !info.IsDir() { if *upload.include != "" { @@ -108,12 +108,21 @@ func runUpload(cmd *Command, args []string) bool { } return err }) + if err != nil { + fmt.Println(err.Error()) + return false + } } else { parts, e := operation.NewFileParts(args) if e != nil { fmt.Println(e.Error()) + return false + } + results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + if err != nil { + fmt.Println(err.Error()) + return false } - results, _ := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/weed/command/volume.go b/weed/command/volume.go index 139a3791e..235eff11b 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -35,31 +35,32 @@ var ( ) type VolumeServerOptions struct { - port *int - publicPort *int - folders []string - folderMaxLimits []int - idxFolder *string - ip *string - publicUrl *string - bindIp *string - masters *string - idleConnectionTimeout *int - dataCenter *string - rack *string - whiteList []string - indexType *string - diskType *string - fixJpgOrientation *bool - readRedirect *bool - cpuProfile *string - memProfile *string - compactionMBPerSecond *int - fileSizeLimitMB *int - concurrentUploadLimitMB *int - pprof *bool - preStopSeconds *int - metricsHttpPort *int + port *int + publicPort *int + folders []string + folderMaxLimits []int + idxFolder *string + ip *string + publicUrl *string + bindIp *string + masters *string + idleConnectionTimeout *int + dataCenter *string + rack *string + whiteList []string + indexType *string + diskType *string + fixJpgOrientation *bool + readMode *string + cpuProfile *string + memProfile *string + compactionMBPerSecond *int + fileSizeLimitMB *int + concurrentUploadLimitMB *int + concurrentDownloadLimitMB *int + pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int enableTcp *bool } @@ -80,12 +81,13 @@ func init() { v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") - v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") + v.readMode = cmdVolume.Flag.String("readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") - v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") + v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size") + v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") @@ -228,10 +230,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, - *v.fixJpgOrientation, *v.readRedirect, + *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, + int64(*v.concurrentDownloadLimitMB)*1024*1024, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -259,6 +262,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // Stop heartbeats if !volumeServer.StopHeartbeat() { + volumeServer.SetStopping() glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) } |
