diff options
Diffstat (limited to 'weed/command')
44 files changed, 2336 insertions, 812 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go index 9a545a183..955ce4006 100644 --- a/weed/command/autocomplete.go +++ b/weed/command/autocomplete.go @@ -41,7 +41,7 @@ func AutocompleteMain(commands []*Command) bool { func installAutoCompletion() bool { if runtime.GOOS == "windows" { - fmt.Println("windows is not supported") + fmt.Println("Windows is not supported") return false } @@ -56,7 +56,7 @@ func installAutoCompletion() bool { func uninstallAutoCompletion() bool { if runtime.GOOS == "windows" { - fmt.Println("windows is not supported") + fmt.Println("Windows is not supported") return false } @@ -65,7 +65,7 @@ func uninstallAutoCompletion() bool { fmt.Printf("uninstall failed! %s\n", err) return false } - fmt.Printf("autocompletion is disable. Please restart your shell.\n") + fmt.Printf("autocompletion is disabled. Please restart your shell.\n") return true } diff --git a/weed/command/backup.go b/weed/command/backup.go index 4c5a2d820..c43b0d351 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -72,12 +73,12 @@ func runBackup(cmd *Command, args []string) bool { vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String()) + lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String()) if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true } - volumeServer := lookup.Locations[0].Url + volumeServer := lookup.Locations[0].ServerAddress() stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) if err != nil { @@ -119,7 +120,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact2(30*1024*1024*1024, 0); err != nil { + if err = v.Compact2(0, 0, nil); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index f0c8f6139..9f18cc5b9 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -3,6 +3,7 @@ package command import ( "bufio" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "math" "math/rand" @@ -10,7 +11,6 @@ import ( "runtime" "runtime/pprof" "sort" - "strings" "sync" "time" @@ -74,14 +74,14 @@ func init() { var cmdBenchmark = &Command{ UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000", - Short: "benchmark on writing millions of files and read out", + Short: "benchmark by writing millions of files and reading them out", Long: `benchmark on an empty SeaweedFS file system. Two tests during benchmark: 1) write lots of small files to the system 2) read the files out - The file content is mostly zero, but no compression is done. + The file content is mostly zeros, but no compression is done. You can choose to only benchmark read or write. During write, the list of uploaded file ids is stored in "-list" specified file. @@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ",")) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() @@ -468,7 +468,7 @@ func (s *stats) printStats() { timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000 fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency) fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken) - fmt.Printf("Complete requests: %d\n", completed) + fmt.Printf("Completed requests: %d\n", completed) fmt.Printf("Failed requests: %d\n", failed) fmt.Printf("Total transferred: %d bytes\n", transferred) fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken) diff --git a/weed/command/command.go b/weed/command/command.go index 8d6525652..7635405dc 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -2,9 +2,10 @@ package command import ( "fmt" - flag "github.com/chrislusf/seaweedfs/weed/util/fla9" "os" "strings" + + flag "github.com/chrislusf/seaweedfs/weed/util/fla9" ) var Commands = []*Command{ @@ -21,6 +22,7 @@ var Commands = []*Command{ cmdFilerCopy, cmdFilerMetaBackup, cmdFilerMetaTail, + cmdFilerRemoteGateway, cmdFilerRemoteSynchronize, cmdFilerReplicate, cmdFilerSynchronize, @@ -35,6 +37,7 @@ var Commands = []*Command{ cmdScaffold, cmdServer, cmdShell, + cmdUpdate, cmdUpload, cmdVersion, cmdVolume, diff --git a/weed/command/compact.go b/weed/command/compact.go index 92e25f474..6df28440a 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -50,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } else { - if err = v.Compact2(preallocate, 0); err != nil { + if err = v.Compact2(preallocate, 0, nil); err != nil { glog.Fatalf("Compact Volume [ERROR] %s\n", err) } } diff --git a/weed/command/download.go b/weed/command/download.go index a64d3f237..a3c05b53d 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -2,16 +2,17 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/security" - "google.golang.org/grpc" "io" - "io/ioutil" "net/http" "os" "path" "strings" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -49,7 +50,7 @@ func runDownload(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for _, fid := range args { - if e := downloadToFile(func() string { return *d.server }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { + if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { fmt.Println("Download Error: ", fid, e) } } @@ -81,7 +82,7 @@ func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpti } defer f.Close() if isFileList { - content, err := ioutil.ReadAll(rc.Body) + content, err := io.ReadAll(rc.Body) if err != nil { return err } @@ -118,7 +119,7 @@ func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption return "", nil, e } defer util.CloseResponse(rc) - content, e = ioutil.ReadAll(rc.Body) + content, e = io.ReadAll(rc.Body) return } diff --git a/weed/command/filer.go b/weed/command/filer.go index ddee0852c..c9f9a1956 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -2,10 +2,10 @@ package command import ( "fmt" + "net" "net/http" "os" - "strconv" - "strings" + "runtime" "time" "google.golang.org/grpc/reflection" @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -30,11 +30,14 @@ var ( ) type FilerOptions struct { - masters *string + masters map[string]pb.ServerAddress + mastersString *string ip *string bindIp *string port *int + portGrpc *int publicPort *int + filerGroup *string collection *string defaultReplicaPlacement *string disableDirListing *bool @@ -45,22 +48,25 @@ type FilerOptions struct { enableNotification *bool disableHttp *bool cipher *bool - peers *string metricsHttpPort *int saveToFilerLimit *int defaultLevelDbDirectory *string concurrentUploadLimitMB *int debug *bool debugPort *int + localSocket *string + showUIDirectoryDelete *bool } func init() { cmdFiler.Run = runFiler // break init cycle - f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") - f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to") + f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") + f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port") f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") @@ -70,22 +76,26 @@ func init() { f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack") f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") - f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") + f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") + f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port") + filerS3Options.portGrpc = cmdFiler.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file") filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file") filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file") - filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") + filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") + filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") + filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") // start webdav on filer filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") @@ -96,10 +106,11 @@ func init() { filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file") filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") - filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB") + filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") // start iam on filer filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service") + filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address") filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port") } @@ -134,10 +145,12 @@ func runFiler(cmd *Command, args []string) bool { go stats_collect.StartMetricsServer(*f.metricsHttpPort) - filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port) + filerAddress := util.JoinHostPort(*f.ip, *f.port) startDelay := time.Duration(2) if *filerStartS3 { filerS3Options.filer = &filerAddress + filerS3Options.bindIp = f.bindIp + filerS3Options.localFilerSocket = f.localSocket go func() { time.Sleep(startDelay * time.Second) filerS3Options.startS3Server() @@ -156,13 +169,15 @@ func runFiler(cmd *Command, args []string) bool { if *filerStartIam { filerIamOptions.filer = &filerAddress - filerIamOptions.masters = f.masters + filerIamOptions.masters = f.mastersString go func() { time.Sleep(startDelay * time.Second) filerIamOptions.startIamServer() }() } + f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap() + f.startFiler() return true @@ -176,16 +191,20 @@ func (fo *FilerOptions) startFiler() { if *fo.publicPort != 0 { publicVolumeMux = http.NewServeMux() } + if *fo.portGrpc == 0 { + *fo.portGrpc = 10000 + *fo.port + } + if *fo.bindIp == "" { + *fo.bindIp = *fo.ip + } defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") - var peers []string - if *fo.peers != "" { - peers = strings.Split(*fo.peers, ",") - } + filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: strings.Split(*fo.masters, ","), + Masters: fo.masters, + FilerGroup: *fo.filerGroup, Collection: *fo.collection, DefaultReplication: *fo.defaultReplicaPlacement, DisableDirListing: *fo.disableDirListing, @@ -195,21 +214,20 @@ func (fo *FilerOptions) startFiler() { Rack: *fo.rack, DefaultLevelDbDir: defaultLevelDbDirectory, DisableHttp: *fo.disableHttp, - Host: *fo.ip, - Port: uint32(*fo.port), + Host: filerAddress, Cipher: *fo.cipher, SaveToFilerLimit: int64(*fo.saveToFilerLimit), - Filers: peers, ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, + ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) } if *fo.publicPort != 0 { - publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort) + publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort) glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress) - publicListener, e := util.NewListener(publicListeningAddress, 0) + publicListener, localPublicListner, e := util.NewIpAndLocalListeners(*fo.bindIp, *fo.publicPort, 0) if e != nil { glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e) } @@ -218,11 +236,18 @@ func (fo *FilerOptions) startFiler() { glog.Fatalf("Volume server fail to serve public: %v", e) } }() + if localPublicListner != nil { + go func() { + if e := http.Serve(localPublicListner, publicVolumeMux); e != nil { + glog.Errorf("Volume server fail to serve public: %v", e) + } + }() + } } glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port) - filerListener, e := util.NewListener( - *fo.bindIp+":"+strconv.Itoa(*fo.port), + filerListener, filerLocalListener, e := util.NewIpAndLocalListeners( + *fo.bindIp, *fo.port, time.Duration(10)*time.Second, ) if e != nil { @@ -230,17 +255,43 @@ func (fo *FilerOptions) startFiler() { } // starting grpc server - grpcPort := *fo.port + 10000 - grpcL, err := util.NewListener(*fo.bindIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *fo.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } go grpcS.Serve(grpcL) httpS := &http.Server{Handler: defaultMux} + if runtime.GOOS != "windows" { + if *fo.localSocket == "" { + *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port) + } + if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error()) + } + go func() { + // start on local unix socket + filerSocketListener, err := net.Listen("unix", *fo.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err) + } + httpS.Serve(filerSocketListener) + }() + } + if filerLocalListener != nil { + go func() { + if err := httpS.Serve(filerLocalListener); err != nil { + glog.Errorf("Filer Fail to serve: %v", e) + } + }() + } if err := httpS.Serve(filerListener); err != nil { glog.Fatalf("Filer Fail to serve: %v", e) } diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 0c450181b..d191c693b 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -54,8 +54,10 @@ func runFilerBackup(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + clientId := util.RandomInt32() + for { - err := doFilerBackup(grpcDialOption, &filerBackupOptions) + err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId) if err != nil { glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) time.Sleep(1747 * time.Millisecond) @@ -69,7 +71,7 @@ const ( BackupKeyPrefix = "backup." ) -func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error { +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error { // find data sink config := util.GetViper() @@ -78,7 +80,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return fmt.Errorf("no data sink configured in replication.toml") } - sourceFiler := *backupOption.filer + sourceFiler := pb.ServerAddress(*backupOption.filer) sourcePath := *backupOption.path timeAgo := *backupOption.timeAgo targetPath := dataSink.GetSinkToDirectory() @@ -102,7 +104,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler) + filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) @@ -112,7 +114,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), - sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 09f5e97fe..ada843dea 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -8,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" - "math" "net/url" "os" "strings" @@ -23,7 +22,7 @@ var ( type FilerCatOptions struct { grpcDialOption grpc.DialOption - filerAddress string + filerAddress pb.ServerAddress filerClient filer_pb.SeaweedFilerClient output *string } @@ -78,7 +77,7 @@ func runFilerCat(cmd *Command, args []string) bool { return false } - filerCat.filerAddress = filerUrl.Host + filerCat.filerAddress = pb.ServerAddress(filerUrl.Host) filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") dir, name := util.FullPath(urlPath).DirAndName() @@ -97,7 +96,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, @@ -115,7 +114,7 @@ func runFilerCat(cmd *Command, args []string) bool { filerCat.filerClient = client - return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, int64(filer.FileSize(respLookupEntry.Entry))) }) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 818ae5f23..f20ae99bf 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -3,11 +3,8 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "io" - "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "strconv" @@ -17,14 +14,14 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/util/grace" - + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -74,7 +71,7 @@ var cmdFilerCopy = &Command{ It can copy one or a list of files or folders. If copying a whole folder recursively: - All files under the folder and subfolders will be copyed. + All files under the folder and sub folders will be copied. Optional parameter "-include" allows you to specify the file name patterns. If "maxMB" is set to a positive number, files larger than it would be split into chunks. @@ -92,35 +89,21 @@ func runCopy(cmd *Command, args []string) bool { filerDestination := args[len(args)-1] fileOrDirs := args[0 : len(args)-1] - filerUrl, err := url.Parse(filerDestination) + filerAddress, urlPath, err := pb.ParseUrl(filerDestination) if err != nil { fmt.Printf("The last argument should be a URL on filer: %v\n", err) return false } - urlPath := filerUrl.Path if !strings.HasSuffix(urlPath, "/") { fmt.Printf("The last argument should be a folder and end with \"/\"\n") return false } - if filerUrl.Port() == "" { - fmt.Printf("The filer port should be specified.\n") - return false - } - - filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64) - if parseErr != nil { - fmt.Printf("The filer port parse error: %v\n", parseErr) - return false - } - - filerGrpcPort := filerPort + 10000 - filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress) + masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerAddress) if err != nil { - fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + fmt.Printf("read from filer %s: %v\n", filerAddress, err) return false } if strings.HasPrefix(urlPath, dirBuckets+"/") { @@ -174,9 +157,8 @@ func runCopy(cmd *Command, args []string) bool { go func() { defer waitGroup.Done() worker := FileCopyWorker{ - options: ©, - filerHost: filerUrl.Host, - filerGrpcAddress: filerGrpcAddress, + options: ©, + filerAddress: filerAddress, } if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) @@ -189,8 +171,8 @@ func runCopy(cmd *Command, args []string) bool { return true } -func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { + err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -228,9 +210,9 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } if mode.IsDir() { - files, _ := ioutil.ReadDir(fileOrDir) + files, _ := os.ReadDir(fileOrDir) for _, subFileOrDir := range files { - cleanedDestDirectory := filepath.Clean(destPath + fi.Name()) + cleanedDestDirectory := destPath + fi.Name() if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), cleanedDestDirectory+"/", fileCopyTaskChan); err != nil { return err } @@ -241,9 +223,8 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } type FileCopyWorker struct { - options *CopyOptions - filerHost string - filerGrpcAddress string + options *CopyOptions + filerAddress pb.ServerAddress } func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { @@ -321,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -356,14 +337,14 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 { mimeType = detectMimeType(f) - data, err := ioutil.ReadAll(f) + data, err := io.ReadAll(f) if err != nil { return err } - // assign a volume - err = util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = util.Retry("upload", func() error { + // assign a volume + assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -381,50 +362,62 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if assignResult.Error != "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) } - if assignResult.Url == "" { + if assignResult.Location.Url == "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult) } return nil }) - }) - if err != nil { - return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) - } + if assignErr != nil { + return assignErr + } + + // upload data + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: fileName, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: mimeType, + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploadResult, err := operation.UploadData(data, uploadOption) + if err != nil { + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + } + if *worker.options.verbose { + fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth)) + return nil + }) if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - } - if *worker.options.verbose { - fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + return fmt.Errorf("upload %v: %v\n", fileName, err) } - chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ - Crtime: time.Now().Unix(), - Mtime: time.Now().Unix(), - Gid: task.gid, - Uid: task.uid, - FileSize: uint64(task.fileSize), - FileMode: uint32(task.fileMode), - Mime: mimeType, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), + Mime: mimeType, + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, @@ -435,7 +428,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } return nil @@ -466,7 +459,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -487,10 +480,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, }) }) if err != nil { - fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + uploadError = fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) + return } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId if collection == "" { collection = assignResult.Collection } @@ -498,7 +492,16 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, replication = assignResult.Replication } - uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: fileName + "-" + strconv.FormatInt(i+1, 10), + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption) if err != nil { uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) return @@ -525,8 +528,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(func() string { - return copy.masters[0] + operation.DeleteFiles(func() pb.ServerAddress { + return pb.ServerAddress(copy.masters[0]) }, false, worker.options.grpcDialOption, fileIds) return uploadError } @@ -536,22 +539,20 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ - Crtime: time.Now().Unix(), - Mtime: time.Now().Unix(), - Gid: task.gid, - Uid: task.uid, - FileSize: uint64(task.fileSize), - FileMode: uint32(task.fileMode), - Mime: mimeType, - Replication: replication, - Collection: collection, - TtlSec: worker.options.ttlSec, + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), + Mime: mimeType, + TtlSec: worker.options.ttlSec, }, Chunks: manifestedChunks, }, @@ -562,10 +563,10 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) return nil } @@ -594,7 +595,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -616,7 +617,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) collection, replication = resp.Collection, resp.Replication return nil @@ -630,8 +631,16 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off return nil, collection, replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr) } - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, flushErr, _ := operation.Upload(fileUrl, name, worker.options.cipher, reader, false, "", nil, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId), + Filename: name, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, flushErr, _ := operation.Upload(reader, uploadOption) if flushErr != nil { return nil, collection, replication, fmt.Errorf("upload data: %v", flushErr) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 6fe323fba..cf679885d 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -27,7 +27,8 @@ type FilerMetaBackupOptions struct { restart *bool backupFilerConfig *string - store filer.FilerStore + store filer.FilerStore + clientId int32 } func init() { @@ -36,6 +37,7 @@ func init() { metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer") metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup") metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store") + metaBackup.clientId = util.RandomInt32() } var cmdFilerMetaBackup = &Command{ @@ -160,24 +162,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { ctx := context.Background() message := resp.EventNotification - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil - } - if message.OldEntry == nil && message.NewEntry != nil { + } else if filer_pb.IsCreate(resp) { println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) return store.InsertEntry(ctx, entry) - } - if message.OldEntry != nil && message.NewEntry == nil { + } else if filer_pb.IsDelete(resp) { println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)) - } - if message.OldEntry != nil && message.NewEntry != nil { - if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { - println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) - entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) - return store.UpdateEntry(ctx, entry) - } + } else if filer_pb.IsUpdate(resp) { + println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)) + entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry) + return store.UpdateEntry(ctx, entry) + } else { + // renaming println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name)) if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil { return err @@ -195,8 +194,8 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return metaBackup.setOffset(lastTime) }) - return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", - *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, + *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } @@ -222,9 +221,9 @@ func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) err var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) -func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 28c0db99b..66a87c3d9 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -1,12 +1,9 @@ package command import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/golang/protobuf/jsonpb" - jsoniter "github.com/json-iterator/go" - "github.com/olivere/elastic/v7" "os" "path/filepath" "strings" @@ -28,8 +25,11 @@ var cmdFilerMetaTail = &Command{ weed filer.meta.tail -timeAgo=30h | grep truncate weed filer.meta.tail -timeAgo=30h | jq . + weed filer.meta.tail -timeAgo=30h -untilTimeAgo=20h | jq . weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name + weed filer.meta.tail -timeAgo=30h -es=http://<elasticSearchServerHost>:<port> -es.index=seaweedfs + `, } @@ -37,6 +37,7 @@ var ( tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port") tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer") tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + tailStop = cmdFilerMetaTail.Flag.Duration("untilTimeAgo", 0, "read until this time ago. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>") esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name") @@ -46,6 +47,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + clientId := util.RandomInt32() var filterFunc func(dir, fname string) bool if *tailPattern != "" { @@ -71,12 +73,12 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { + if filer_pb.IsEmpty(resp) { + return false + } if filterFunc == nil { return true } - if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { - return false - } if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) { return true } @@ -103,9 +105,13 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", - *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0, - func(resp *filer_pb.SubscribeMetadataResponse) error { + var untilTsNs int64 + if *tailStop != 0 { + untilTsNs = time.Now().Add(-*tailStop).UnixNano() + } + + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, *tailTarget, nil, + time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { return nil } @@ -113,7 +119,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { return err } return nil - }, false) + }, pb.TrivialOnError) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) @@ -121,72 +127,3 @@ func runFilerMetaTail(cmd *Command, args []string) bool { return true } - -type EsDocument struct { - Dir string `json:"dir,omitempty"` - Name string `json:"name,omitempty"` - IsDirectory bool `json:"isDir,omitempty"` - Size uint64 `json:"size,omitempty"` - Uid uint32 `json:"uid,omitempty"` - Gid uint32 `json:"gid,omitempty"` - UserName string `json:"userName,omitempty"` - Collection string `json:"collection,omitempty"` - Crtime int64 `json:"crtime,omitempty"` - Mtime int64 `json:"mtime,omitempty"` - Mime string `json:"mime,omitempty"` -} - -func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) { - entry := event.NewEntry - dir, name := event.NewParentPath, entry.Name - id := util.Md5String([]byte(util.NewFullPath(dir, name))) - esEntry := &EsDocument{ - Dir: dir, - Name: name, - IsDirectory: entry.IsDirectory, - Size: entry.Attributes.FileSize, - Uid: entry.Attributes.Uid, - Gid: entry.Attributes.Gid, - UserName: entry.Attributes.UserName, - Collection: entry.Attributes.Collection, - Crtime: entry.Attributes.Crtime, - Mtime: entry.Attributes.Mtime, - Mime: entry.Attributes.Mime, - } - return esEntry, id -} - -func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { - options := []elastic.ClientOptionFunc{} - options = append(options, elastic.SetURL(strings.Split(servers, ",")...)) - options = append(options, elastic.SetSniff(false)) - options = append(options, elastic.SetHealthcheck(false)) - client, err := elastic.NewClient(options...) - if err != nil { - return nil, err - } - return func(resp *filer_pb.SubscribeMetadataResponse) error { - event := resp.EventNotification - if event.OldEntry != nil && - (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) { - // delete or not update the same file - dir, name := resp.Directory, event.OldEntry.Name - id := util.Md5String([]byte(util.NewFullPath(dir, name))) - println("delete", id) - _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background()) - return err - } - if event.NewEntry != nil { - // add a new file or update the same file - esEntry, id := toEsEntry(event) - value, err := jsoniter.Marshal(esEntry) - if err != nil { - return err - } - println(string(value)) - _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background()) - return err - } - return nil - }, nil -} diff --git a/weed/command/filer_meta_tail_elastic.go b/weed/command/filer_meta_tail_elastic.go new file mode 100644 index 000000000..5776c4f97 --- /dev/null +++ b/weed/command/filer_meta_tail_elastic.go @@ -0,0 +1,80 @@ +//go:build elastic +// +build elastic + +package command + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + jsoniter "github.com/json-iterator/go" + elastic "github.com/olivere/elastic/v7" + "strings" +) + +type EsDocument struct { + Dir string `json:"dir,omitempty"` + Name string `json:"name,omitempty"` + IsDirectory bool `json:"isDir,omitempty"` + Size uint64 `json:"size,omitempty"` + Uid uint32 `json:"uid,omitempty"` + Gid uint32 `json:"gid,omitempty"` + UserName string `json:"userName,omitempty"` + Crtime int64 `json:"crtime,omitempty"` + Mtime int64 `json:"mtime,omitempty"` + Mime string `json:"mime,omitempty"` +} + +func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) { + entry := event.NewEntry + dir, name := event.NewParentPath, entry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + esEntry := &EsDocument{ + Dir: dir, + Name: name, + IsDirectory: entry.IsDirectory, + Size: entry.Attributes.FileSize, + Uid: entry.Attributes.Uid, + Gid: entry.Attributes.Gid, + UserName: entry.Attributes.UserName, + Crtime: entry.Attributes.Crtime, + Mtime: entry.Attributes.Mtime, + Mime: entry.Attributes.Mime, + } + return esEntry, id +} + +func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { + options := []elastic.ClientOptionFunc{} + options = append(options, elastic.SetURL(strings.Split(servers, ",")...)) + options = append(options, elastic.SetSniff(false)) + options = append(options, elastic.SetHealthcheck(false)) + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return func(resp *filer_pb.SubscribeMetadataResponse) error { + event := resp.EventNotification + if event.OldEntry != nil && + (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) { + // delete or not update the same file + dir, name := resp.Directory, event.OldEntry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + println("delete", id) + _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background()) + return err + } + if event.NewEntry != nil { + // add a new file or update the same file + esEntry, id := toEsEntry(event) + value, err := jsoniter.Marshal(esEntry) + if err != nil { + return err + } + println(string(value)) + _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background()) + return err + } + return nil + }, nil +} diff --git a/weed/command/filer_meta_tail_non_elastic.go b/weed/command/filer_meta_tail_non_elastic.go new file mode 100644 index 000000000..f78f3ee09 --- /dev/null +++ b/weed/command/filer_meta_tail_non_elastic.go @@ -0,0 +1,14 @@ +//go:build !elastic +// +build !elastic + +package command + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { + return func(resp *filer_pb.SubscribeMetadataResponse) error { + return nil + }, nil +} diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go new file mode 100644 index 000000000..33454f378 --- /dev/null +++ b/weed/command/filer_remote_gateway.go @@ -0,0 +1,119 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "os" + "time" +) + +type RemoteGatewayOptions struct { + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + timeAgo *time.Duration + createBucketAt *string + createBucketRandomSuffix *bool + include *string + exclude *string + + mappings *remote_pb.RemoteStorageMapping + remoteConfs map[string]*remote_pb.RemoteConf + bucketsDir string + clientId int32 +} + +var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) + +func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) +} +func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +var ( + remoteGatewayOptions RemoteGatewayOptions +) + +func init() { + cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle + remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") + remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") + remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts") + remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") + remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*") + remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*") + remoteGatewayOptions.clientId = util.RandomInt32() +} + +var cmdFilerRemoteGateway = &Command{ + UsageLine: "filer.remote.gateway", + Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote object store", + Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote object store + + filer.remote.gateway listens on filer local buckets update events. + If any bucket is created, deleted, or updated, it will mirror the changes to remote object store. + + weed filer.remote.sync -createBucketAt=cloud1 + +`, +} + +func runFilerRemoteGateway(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + remoteGatewayOptions.grpcDialOption = grpcDialOption + + filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress) + + filerSource := &source.FilerSource{} + filerSource.DoInitialize( + filerAddress.ToHttpAddress(), + filerAddress.ToGrpcAddress(), + "/", // does not matter + *remoteGatewayOptions.readChunkFromFiler, + ) + + remoteGatewayOptions.bucketsDir = "/buckets" + // check buckets again + remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { + resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + remoteGatewayOptions.bucketsDir = resp.DirBuckets + return nil + }) + + // read filer remote storage mount mappings + if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil { + fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) + return true + } + + // synchronize /buckets folder + fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) + util.RetryForever("filer.remote.sync buckets", func() error { + return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err) + } + return true + }) + return true + +} diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go new file mode 100644 index 000000000..c3ff756db --- /dev/null +++ b/weed/command/filer_remote_gateway_buckets.go @@ -0,0 +1,400 @@ +package command + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "math" + "math/rand" + "path/filepath" + "strings" + "time" +) + +func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { + + // read filer remote storage mount mappings + if detectErr := option.collectRemoteStorageConf(); detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } + + eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource) + if err != nil { + return err + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) + }) + + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, + option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) +} + +func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { + + handleCreateBucket := func(entry *filer_pb.Entry) error { + if !entry.IsDirectory { + return nil + } + if entry.RemoteEntry != nil { + // this directory is imported from "remote.mount.buckets" or "remote.mount" + return nil + } + if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" { + *option.createBucketAt = option.mappings.PrimaryBucketStorageName + glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt) + } + if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" { + for k := range option.mappings.Mappings { + *option.createBucketAt = k + glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt) + } + } + if *option.createBucketAt == "" { + return nil + } + remoteConf, found := option.remoteConfs[*option.createBucketAt] + if !found { + return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt) + } + + client, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return err + } + + bucketName := strings.ToLower(entry.Name) + if *option.include != "" { + if ok, _ := filepath.Match(*option.include, entry.Name); !ok { + return nil + } + } + if *option.exclude != "" { + if ok, _ := filepath.Match(*option.exclude, entry.Name); ok { + return nil + } + } + + bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) + remoteLocation, found := option.mappings.Mappings[string(bucketPath)] + if !found { + if *option.createBucketRandomSuffix { + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html + if len(bucketName)+5 > 63 { + bucketName = bucketName[:58] + } + bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000) + } + remoteLocation = &remote_pb.RemoteStorageLocation{ + Name: *option.createBucketAt, + Bucket: bucketName, + Path: "/", + } + // need to add new mapping here before getting updates from metadata tailing + option.mappings.Mappings[string(bucketPath)] = remoteLocation + } else { + bucketName = remoteLocation.Bucket + } + + glog.V(0).Infof("create bucket %s", bucketName) + if err := client.CreateBucket(bucketName); err != nil { + return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err) + } + + return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) + + } + handleDeleteBucket := func(entry *filer_pb.Entry) error { + if !entry.IsDirectory { + return nil + } + + client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) + if err != nil { + return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err) + } + + glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) + if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { + return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err) + } + + bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) + + return filer.DeleteMountMapping(option, string(bucketPath)) + } + + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry != nil { + // update + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + option.mappings = newMappings + } + if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + option.remoteConfs[conf.Name] = conf + } + } else if message.OldEntry != nil { + // deletion + if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err) + } + delete(option.remoteConfs, conf.Name) + } + } + + return nil + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + + if filer_pb.IsEmpty(resp) { + return nil + } + if filer_pb.IsCreate(resp) { + if message.NewParentPath == option.bucketsDir { + return handleCreateBucket(message.NewEntry) + } + if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/.uploads/") { + return nil + } + if !filer.HasData(message.NewEntry) { + return nil + } + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + glog.V(2).Infof("create: %+v", resp) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping creating: %+v", resp) + return nil + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) + return client.WriteDirectory(dest, message.NewEntry) + } + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if filer_pb.IsDelete(resp) { + if resp.Directory == option.bucketsDir { + return handleDeleteBucket(message.OldEntry) + } + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + glog.V(2).Infof("delete: %+v", resp) + dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + if resp.Directory == option.bucketsDir { + if message.NewParentPath == option.bucketsDir { + if message.OldEntry.Name == message.NewEntry.Name { + return nil + } + if err := handleCreateBucket(message.NewEntry); err != nil { + return err + } + if err := handleDeleteBucket(message.OldEntry); err != nil { + return err + } + } + } + oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) + newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) + if oldOk && newOk { + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) + if err != nil { + return err + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + // update the same entry + if message.NewEntry.IsDirectory { + // update directory property + return nil + } + if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) + oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) + return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry) + } else { + newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) + remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + } + } + + // the following is entry rename + if oldOk { + client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) + if err != nil { + return err + } + oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) + if message.OldEntry.IsDirectory { + return client.RemoveDirectory(oldDest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + } + if newOk { + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + client, err := remote_storage.GetRemoteStorage(newRemoteStorage) + if err != nil { + return err + } + newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) + if message.NewEntry.IsDirectory { + return client.WriteDirectory(newDest, message.NewEntry) + } + remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + } + + return nil + } + return eachEntryFunc, nil +} + +func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { + bucket := util.FullPath(option.bucketsDir).Child(bucketName) + + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] + if !isMounted { + return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket) + } + remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name] + if !hasClient { + return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + } + + client, err = remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return nil, remoteStorageMountLocation, err + } + return client, remoteStorageMountLocation, nil +} + +func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { + bucket, ok = extractBucketPath(option.bucketsDir, actualDir) + if !ok { + return "", nil, nil, false + } + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] + if !isMounted { + glog.Warningf("%s is not mounted", bucket) + return "", nil, nil, false + } + var hasClient bool + remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name] + if !hasClient { + glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + return "", nil, nil, false + } + return bucket, remoteStorageMountLocation, remoteConf, true +} + +func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { + if !strings.HasPrefix(dir, bucketsDir+"/") { + return "", false + } + parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2) + return util.FullPath(bucketsDir).Child(parts[0]), true +} + +func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) { + + if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil { + return err + } else { + option.mappings = mappings + } + + option.remoteConfs = make(map[string]*remote_pb.RemoteConf) + var lastConfName string + err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error { + if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + return nil + } + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) + } + option.remoteConfs[conf.Name] = conf + lastConfName = conf.Name + return nil + }, "", false, math.MaxUint32) + + if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 { + glog.V(0).Infof("%s is set to the default remote storage", lastConfName) + option.mappings.PrimaryBucketStorageName = lastConfName + } + + return +} diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 8d2719660..d6ccf7b79 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -1,14 +1,10 @@ package command import ( - "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" - "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -20,19 +16,15 @@ type RemoteSyncOptions struct { filerAddress *string grpcDialOption grpc.DialOption readChunkFromFiler *bool - debug *bool timeAgo *time.Duration dir *string + clientId int32 } -const ( - RemoteSyncKeyPrefix = "remote.sync." -) - var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) -func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } @@ -47,20 +39,28 @@ var ( func init() { cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") - remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") + remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") - remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") - remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + remoteSyncOptions.clientId = util.RandomInt32() } var cmdFilerRemoteSynchronize = &Command{ - UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud", - Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage", - Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage + UsageLine: "filer.remote.sync", + Short: "resumable continuously write back updates to remote storage", + Long: `resumable continuously write back updates to remote storage filer.remote.sync listens on filer update events. If any mounted remote file is updated, it will fetch the updated content, and write to the remote storage. + + weed filer.remote.sync -dir=/mount/s3_on_cloud + + The metadata sync starting time is determined with the following priority order: + 1. specified by timeAgo + 2. last sync timestamp for this directory + 3. directory creation time + `, } @@ -71,176 +71,29 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { remoteSyncOptions.grpcDialOption = grpcDialOption dir := *remoteSyncOptions.dir - filerAddress := *remoteSyncOptions.filerAddress + filerAddress := pb.ServerAddress(*remoteSyncOptions.filerAddress) filerSource := &source.FilerSource{} filerSource.DoInitialize( - filerAddress, - pb.ServerToGrpcAddress(filerAddress), + filerAddress.ToHttpAddress(), + filerAddress.ToGrpcAddress(), "/", // does not matter *remoteSyncOptions.readChunkFromFiler, ) - fmt.Printf("synchronize %s to remote storage...\n", dir) - util.RetryForever("filer.remote.sync "+dir, func() error { - return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) - }, func(err error) bool { - if err != nil { - glog.Errorf("synchronize %s: %v", dir, err) - } - return true - }) - - return true -} - -func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { - - // read filer remote storage mount mappings - _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir) - if detectErr != nil { - return fmt.Errorf("read mount info: %v", detectErr) - } - - dirHash := util.HashStringToLong(mountedDir) - - // 1. specified by timeAgo - // 2. last offset timestamp for this directory - // 3. directory creation time - var lastOffsetTs time.Time - if *option.timeAgo == 0 { - mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) - if err != nil { - return fmt.Errorf("lookup %s: %v", mountedDir, err) - } - - lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) - if mountedDirEntry != nil { - if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { - lastOffsetTs = time.Unix(0, lastOffsetTsNs) - glog.V(0).Infof("resume from %v", lastOffsetTs) - } else { - lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) - } - } else { - lastOffsetTs = time.Now() - } - } else { - lastOffsetTs = time.Now().Add(-*option.timeAgo) - } - - client, err := remote_storage.GetRemoteStorage(remoteStorage) - if err != nil { - return err - } - - eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - if message.OldEntry == nil && message.NewEntry == nil { - return nil - } - if message.OldEntry == nil && message.NewEntry != nil { - if !filer.HasData(message.NewEntry) { - return nil - } - glog.V(2).Infof("create: %+v", resp) - if !shouldSendToRemote(message.NewEntry) { - glog.V(2).Infof("skipping creating: %+v", resp) - return nil - } - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) - if message.NewEntry.IsDirectory { - glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) - return client.WriteDirectory(dest, message.NewEntry) - } - glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) - reader := filer.NewFileReader(filerSource, message.NewEntry) - remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) - if writeErr != nil { - return writeErr - } - return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) - } - if message.OldEntry != nil && message.NewEntry == nil { - glog.V(2).Infof("delete: %+v", resp) - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) - if message.OldEntry.IsDirectory { - glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) - return client.RemoveDirectory(dest) + if dir != "" { + fmt.Printf("synchronize %s to remote storage...\n", dir) + util.RetryForever("filer.remote.sync "+dir, func() error { + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s: %v", dir, err) } - glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) - return client.DeleteFile(dest) - } - if message.OldEntry != nil && message.NewEntry != nil { - oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) - if !shouldSendToRemote(message.NewEntry) { - glog.V(2).Infof("skipping updating: %+v", resp) - return nil - } - if message.NewEntry.IsDirectory { - return client.WriteDirectory(dest, message.NewEntry) - } - if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { - if filer.IsSameData(message.OldEntry, message.NewEntry) { - glog.V(2).Infof("update meta: %+v", resp) - return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) - } - } - glog.V(2).Infof("update: %+v", resp) - glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) - if err := client.DeleteFile(oldDest); err != nil { - return err - } - reader := filer.NewFileReader(filerSource, message.NewEntry) - glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) - remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) - if writeErr != nil { - return writeErr - } - return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) - } - - return nil - } - - processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { - lastTime := time.Unix(0, lastTsNs) - glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) - }) - - return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, - "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) -} - -func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { - source := string(sourcePath[len(mountDir):]) - dest := util.FullPath(remoteMountLocation.Path).Child(source) - return &remote_pb.RemoteStorageLocation{ - Name: remoteMountLocation.Name, - Bucket: remoteMountLocation.Bucket, - Path: string(dest), - } -} - -func shouldSendToRemote(entry *filer_pb.Entry) bool { - if entry.RemoteEntry == nil { - return true - } - if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime { + return true + }) return true } - return false -} -func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { - entry.RemoteEntry = remoteEntry - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: entry, - }) - return err - }) + return true + } diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go new file mode 100644 index 000000000..5fc20be9a --- /dev/null +++ b/weed/command/filer_remote_sync_dir.go @@ -0,0 +1,237 @@ +package command + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" +) + +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { + + // read filer remote storage mount mappings + _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir) + if detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } + + eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource) + if err != nil { + return err + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs) + }) + + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, + mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) +} + +func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return nil, err + } + + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + if remoteLoc, found := mappings.Mappings[mountedDir]; found { + if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { + glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + } + } else { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) + } + } + if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + remoteStorage = conf + if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { + client = newClient + } else { + return err + } + } + + return nil + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + + if filer_pb.IsEmpty(resp) { + return nil + } + if filer_pb.IsCreate(resp) { + if !filer.HasData(message.NewEntry) { + return nil + } + glog.V(2).Infof("create: %+v", resp) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping creating: %+v", resp) + return nil + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) + return client.WriteDirectory(dest, message.NewEntry) + } + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if filer_pb.IsDelete(resp) { + glog.V(2).Infof("delete: %+v", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + if filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) + return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) + } + } + glog.V(2).Infof("update: %+v", resp) + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + + return nil + } + return eachEntryFunc, nil +} + +func retriedWriteFile(client remote_storage.RemoteStorageClient, filerSource *source.FilerSource, newEntry *filer_pb.Entry, dest *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) { + var writeErr error + err = util.Retry("writeFile", func() error { + reader := filer.NewFileReader(filerSource, newEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + remoteEntry, writeErr = client.WriteFile(dest, newEntry, reader) + if writeErr != nil { + return writeErr + } + return nil + }) + if err != nil { + glog.Errorf("write to %s: %v", dest, err) + } + return +} + +func collectLastSyncOffset(filerClient filer_pb.FilerClient, grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, mountedDir string, timeAgo time.Duration) time.Time { + // 1. specified by timeAgo + // 2. last offset timestamp for this directory + // 3. directory creation time + var lastOffsetTs time.Time + if timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(filerClient, util.FullPath(mountedDir)) + if err != nil { + glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) + return time.Now() + } + + lastOffsetTsNs, err := remote_storage.GetSyncOffset(grpcDialOption, filerAddress, mountedDir) + if mountedDirEntry != nil { + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } + } else { + lastOffsetTs = time.Now() + } + } else { + lastOffsetTs = time.Now().Add(-timeAgo) + } + return lastOffsetTs +} + +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { + source := string(sourcePath[len(mountDir):]) + dest := util.FullPath(remoteMountLocation.Path).Child(source) + return &remote_pb.RemoteStorageLocation{ + Name: remoteMountLocation.Name, + Bucket: remoteMountLocation.Bucket, + Path: string(dest), + } +} + +func shouldSendToRemote(entry *filer_pb.Entry) bool { + if entry.RemoteEntry == nil { + return true + } + if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime { + return true + } + return false +} + +func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() + entry.RemoteEntry = remoteEntry + return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) +} diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 5440811dd..b7da1baf9 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -12,9 +12,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" + statsCollect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" + "os" "strings" "time" ) @@ -35,8 +37,12 @@ type SyncOptions struct { bDiskType *string aDebug *bool bDebug *bool + aFromTsMs *int64 + bFromTsMs *int64 aProxyByFiler *bool bProxyByFiler *bool + metricsHttpPort *int + clientId int32 } var ( @@ -64,8 +70,12 @@ func init() { syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") + syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond") + syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") + syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") + syncOptions.clientId = util.RandomInt32() } var cmdFilerSynchronize = &Command{ @@ -93,10 +103,37 @@ func runFilerSynchronize(cmd *Command, args []string) bool { grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) + filerA := pb.ServerAddress(*syncOptions.filerA) + filerB := pb.ServerAddress(*syncOptions.filerB) + + // start filer.sync metrics server + go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort) + + // read a filer signature + aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA) + if aFilerErr != nil { + glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr) + return true + } + // read b filer signature + bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB) + if bFilerErr != nil { + glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr) + return true + } + go func() { + // a->b + // set synchronization start timestamp to offset + initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs) + if initOffsetError != nil { + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) + os.Exit(2) + } for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug) + err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, + *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, + *syncOptions.bDebug, aFilerSignature, bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -105,10 +142,18 @@ func runFilerSynchronize(cmd *Command, args []string) bool { }() if !*syncOptions.isActivePassive { + // b->a + // set synchronization start timestamp to offset + initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs) + if initOffsetError != nil { + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) + os.Exit(2) + } go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug) + err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, + *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, + *syncOptions.aDebug, bFilerSignature, aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -122,23 +167,28 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error { - - // read source filer signature - sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler) - if sourceErr != nil { - return sourceErr +// initOffsetFromTsMs Initialize offset +func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error { + if fromTsMs <= 0 { + return nil } - // read target filer signature - targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler) - if targetErr != nil { - return targetErr + // convert to nanosecond + fromTsNs := fromTsMs * 1000_000 + // If not successful, exit the program. + setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs) + if setOffsetErr != nil { + return setOffsetErr } + glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB) + return nil +} + +func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) if err != nil { return err } @@ -147,9 +197,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler) + filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} - filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) + filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) @@ -165,13 +215,19 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return persistEventFn(resp) } + var lastLogTsNs = time.Now().Nanosecond() + var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { - glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) - return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) + now := time.Now().Nanosecond() + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + lastLogTsNs = now + // collect synchronous offset + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs)) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, - sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, + sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } @@ -179,9 +235,19 @@ const ( SyncKeyPrefix = "sync." ) -func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { +// When each business is distinguished according to path, and offsets need to be maintained separately. +func getSignaturePrefixByPath(path string) string { + // compatible historical version + if path == "/" { + return SyncKeyPrefix + } else { + return SyncKeyPrefix + path + } +} + +func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -206,8 +272,8 @@ func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix str } -func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -255,16 +321,19 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // handle deletions - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(resp) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) - return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + if !dataSink.IsIncremental() { + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + return nil } // handle new entries - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(resp) { if !strings.HasPrefix(string(sourceNewKey), sourcePath) { return nil } @@ -273,7 +342,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // this is something special? - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil } diff --git a/weed/command/filer_sync_std.go b/weed/command/filer_sync_std.go index 63851eaf8..1f9b6fa14 100644 --- a/weed/command/filer_sync_std.go +++ b/weed/command/filer_sync_std.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package command diff --git a/weed/command/fix.go b/weed/command/fix.go index ae9a051b8..d19496a79 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -1,9 +1,12 @@ package command import ( + "fmt" + "io/fs" "os" "path" "strconv" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -19,17 +22,15 @@ func init() { } var cmdFix = &Command{ - UsageLine: "fix -dir=/tmp -volumeId=234", - Short: "run weed tool fix on index file if corrupted", - Long: `Fix runs the SeaweedFS fix command to re-create the index .idx file. - + UsageLine: "fix [-volumeId=234] [-collection=bigData] /tmp", + Short: "run weed tool fix on files or whole folders to recreate index file(s) if corrupted", + Long: `Fix runs the SeaweedFS fix command on dat files or whole folders to re-create the index .idx file. `, } var ( - fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files") - fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name") - fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") + fixVolumeCollection = cmdFix.Flag.String("collection", "", "an optional volume collection name, if specified only it will be processed") + fixVolumeId = cmdFix.Flag.Int64("volumeId", 0, "an optional volume id, if not 0 (default) only it will be processed") ) type VolumeFileScanner4Fix struct { @@ -59,26 +60,68 @@ func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64 } func runFix(cmd *Command, args []string) bool { - - if *fixVolumeId == -1 { - return false + for _, arg := range args { + basePath, f := path.Split(util.ResolvePath(arg)) + + files := []fs.DirEntry{} + if f == "" { + fileInfo, err := os.ReadDir(basePath) + if err != nil { + fmt.Println(err) + return false + } + files = fileInfo + } else { + fileInfo, err := os.Stat(basePath + f) + if err != nil { + fmt.Println(err) + return false + } + files = []fs.DirEntry{fs.FileInfoToDirEntry(fileInfo)} + } + + for _, file := range files { + if !strings.HasSuffix(file.Name(), ".dat") { + continue + } + if *fixVolumeCollection != "" { + if !strings.HasPrefix(file.Name(), *fixVolumeCollection+"_") { + continue + } + } + baseFileName := file.Name()[:len(file.Name())-4] + collection, volumeIdStr := "", baseFileName + if sepIndex := strings.LastIndex(baseFileName, "_"); sepIndex > 0 { + collection = baseFileName[:sepIndex] + volumeIdStr = baseFileName[sepIndex+1:] + } + volumeId, parseErr := strconv.ParseInt(volumeIdStr, 10, 64) + if parseErr != nil { + fmt.Printf("Failed to parse volume id from %s: %v\n", baseFileName, parseErr) + return false + } + if *fixVolumeId != 0 && *fixVolumeId != volumeId { + continue + } + doFixOneVolume(basePath, baseFileName, collection, volumeId) + } } + return true +} - baseFileName := strconv.Itoa(*fixVolumeId) - if *fixVolumeCollection != "" { - baseFileName = *fixVolumeCollection + "_" + baseFileName - } - indexFileName := path.Join(util.ResolvePath(*fixVolumePath), baseFileName+".idx") +func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64) { + + indexFileName := path.Join(basepath, baseFileName+".idx") nm := needle_map.NewMemDb() defer nm.Close() - vid := needle.VolumeId(*fixVolumeId) + vid := needle.VolumeId(volumeId) scanner := &VolumeFileScanner4Fix{ nm: nm, } - if err := storage.ScanVolumeFile(util.ResolvePath(*fixVolumePath), *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil { + if err := storage.ScanVolumeFile(basepath, collection, vid, storage.NeedleMapInMemory, scanner); err != nil { glog.Fatalf("scan .dat File: %v", err) os.Remove(indexFileName) } @@ -87,6 +130,4 @@ func runFix(cmd *Command, args []string) bool { glog.Fatalf("save to .idx File: %v", err) os.Remove(indexFileName) } - - return true } diff --git a/weed/command/iam.go b/weed/command/iam.go index ed4eea543..968d23095 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -22,6 +22,7 @@ var ( type IamOptions struct { filer *string masters *string + ip *string port *int } @@ -29,6 +30,7 @@ func init() { cmdIam.Run = runIam // break init cycle iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "filer server address") iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers") + iamStandaloneOptions.ip = cmdIam.Flag.String("ip", util.DetectedHostAddress(), "iam server http listen ip address") iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port") } @@ -43,38 +45,35 @@ func runIam(cmd *Command, args []string) bool { } func (iamopt *IamOptions) startIamServer() bool { - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*iamopt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*iamopt.filer) util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } glog.V(0).Infof("IAM read filer configuration: %s", resp) return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress()) break } } + masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap() router := mux.NewRouter().SkipClean(true) _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ - Filer: *iamopt.filer, - Port: *iamopt.port, - FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: grpcDialOption, + Masters: masters, + Filer: filerAddress, + Port: *iamopt.port, + GrpcDialOption: grpcDialOption, }) glog.V(0).Info("NewIamApiServer created") if iamApiServer_err != nil { @@ -84,12 +83,19 @@ func (iamopt *IamOptions) startIamServer() bool { httpS := &http.Server{Handler: router} listenAddress := fmt.Sprintf(":%d", *iamopt.port) - iamApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) + iamApiListener, iamApiLocalListener, err := util.NewIpAndLocalListeners(*iamopt.ip, *iamopt.port, time.Duration(10)*time.Second) if err != nil { glog.Fatalf("IAM API Server listener on %s error: %v", listenAddress, err) } glog.V(0).Infof("Start Seaweed IAM API Server %s at http port %d", util.Version(), *iamopt.port) + if iamApiLocalListener != nil { + go func() { + if err = httpS.Serve(iamApiLocalListener); err != nil { + glog.Errorf("IAM API Server Fail to serve: %v", err) + } + }() + } if err = httpS.Serve(iamApiListener); err != nil { glog.Fatalf("IAM API Server Fail to serve: %v", err) } diff --git a/weed/command/imports.go b/weed/command/imports.go index a2f59189f..afdbc5a10 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -5,7 +5,6 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure" _ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs" _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" @@ -15,6 +14,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" + _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" @@ -29,6 +29,8 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" _ "github.com/chrislusf/seaweedfs/weed/filer/tikv" + _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" ) diff --git a/weed/command/master.go b/weed/command/master.go index 4eb43ee09..ab8466d47 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,23 +1,27 @@ package command import ( - "github.com/chrislusf/raft/protobuf" - "github.com/gorilla/mux" - "google.golang.org/grpc/reflection" + "fmt" + "golang.org/x/exp/slices" "net/http" "os" - "sort" - "strconv" + "path" "strings" "time" + "github.com/chrislusf/raft/protobuf" + stats_collect "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/gorilla/mux" + "github.com/spf13/viper" + "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -28,6 +32,7 @@ var ( type MasterOptions struct { port *int + portGrpc *int ip *string ipBind *string metaFolder *string @@ -42,13 +47,19 @@ type MasterOptions struct { metricsAddress *string metricsIntervalSec *int raftResumeState *bool + metricsHttpPort *int + heartbeatInterval *time.Duration + electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool } func init() { cmdMaster.Run = runMaster // break init cycle m.port = cmdMaster.Flag.Int("port", 9333, "http listen port") + m.portGrpc = cmdMaster.Flag.Int("port.grpc", 0, "grpc listen port") m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier") - m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to") + m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") @@ -60,7 +71,12 @@ func init() { m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") + m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") + m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") + m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") + m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") + m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") } var cmdMaster = &Command{ @@ -103,6 +119,7 @@ func runMaster(cmd *Command, args []string) bool { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } + go stats_collect.StartMetricsServer(*m.metricsHttpPort) startMaster(m, masterWhiteList) return true @@ -112,65 +129,139 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { backend.LoadConfiguration(util.GetViper()) - myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) + if *masterOption.portGrpc == 0 { + *masterOption.portGrpc = 10000 + *masterOption.port + } + if *masterOption.ipBind == "" { + *masterOption.ipBind = *masterOption.ip + } + + myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers) + + masterPeers := make(map[string]pb.ServerAddress) + for _, peer := range peers { + masterPeers[string(peer)] = peer + } r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) - listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port) + ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers) + listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port) glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) - masterListener, e := util.NewListener(listeningAddress, 0) + masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } + // start raftServer - raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"), - peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState) - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port)) + raftServerOption := &weed_server.RaftServerOption{ + GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), + Peers: masterPeers, + ServerAddr: myMasterAddress, + DataDir: util.ResolvePath(metaDir), + Topo: ms.Topo, + RaftResumeState: *masterOption.raftResumeState, + HeartbeatInterval: *masterOption.heartbeatInterval, + ElectionTimeout: *masterOption.electionTimeout, + RaftBootstrap: *m.raftBootstrap, + } + var raftServer *weed_server.RaftServer + var err error + if *m.raftHashicorp { + if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil { + glog.Fatalf("NewHashicorpRaftServer: %s", err) + } + } else { + raftServer, err = weed_server.NewRaftServer(raftServerOption) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + if *m.raftHashicorp { + r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") + } // starting grpc server - grpcPort := *masterOption.port + 10000 - grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *masterOption.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) + if *m.raftHashicorp { + raftServer.TransportManager.Register(grpcS) + } else { + protobuf.RegisterRaftServer(grpcS, raftServer) + } reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } go grpcS.Serve(grpcL) - go func() { - time.Sleep(1500 * time.Millisecond) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + timeSleep := 1500 * time.Millisecond + if !*m.raftHashicorp { + go func() { + time.Sleep(timeSleep) + if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { + if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() + } } - } - }() + }() + } go ms.MasterClient.KeepConnectedToMaster() // start http server + var ( + clientCertFile, + certFile, + keyFile string + ) + useTLS := false + useMTLS := false + + if viper.GetString("https.master.key") != "" { + useTLS = true + certFile = viper.GetString("https.master.cert") + keyFile = viper.GetString("https.master.key") + } + + if viper.GetString("https.master.ca") != "" { + useMTLS = true + clientCertFile = viper.GetString("https.master.ca") + } + httpS := &http.Server{Handler: r} - go httpS.Serve(masterListener) + if masterLocalListner != nil { + go httpS.Serve(masterLocalListner) + } + + if useMTLS { + httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile) + } + + if useTLS { + go httpS.ServeTLS(masterListener, certFile, keyFile) + } else { + go httpS.Serve(masterListener) + } select {} } -func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { +func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers string) (masterAddress pb.ServerAddress, cleanedPeers []pb.ServerAddress) { glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers) - masterAddress = masterIp + ":" + strconv.Itoa(masterPort) - if peers != "" { - cleanedPeers = strings.Split(peers, ",") - } + masterAddress = pb.NewServerAddress(masterIp, masterPort, masterGrpcPort) + cleanedPeers = pb.ServerAddresses(peers).ToAddresses() hasSelf := false for _, peer := range cleanedPeers { - if peer == masterAddress { + if peer.ToHttpAddress() == masterAddress.ToHttpAddress() { hasSelf = true break } @@ -180,13 +271,15 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st cleanedPeers = append(cleanedPeers, masterAddress) } if len(cleanedPeers)%2 == 0 { - glog.Fatalf("Only odd number of masters are supported!") + glog.Fatalf("Only odd number of masters are supported: %+v", cleanedPeers) } return } -func isTheFirstOne(self string, peers []string) bool { - sort.Strings(peers) +func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { + slices.SortFunc(peers, func(a, b pb.ServerAddress) bool { + return strings.Compare(string(a), string(b)) < 0 + }) if len(peers) <= 0 { return true } @@ -194,9 +287,9 @@ func isTheFirstOne(self string, peers []string) bool { } func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { + masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc) return &weed_server.MasterOption{ - Host: *m.ip, - Port: *m.port, + Master: masterAddress, MetaFolder: *m.metaFolder, VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), VolumePreallocate: *m.volumePreallocate, diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index b628f7abf..ec7d2758f 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -3,19 +3,18 @@ package command import ( "context" "fmt" + "net/http" + "time" + "github.com/aws/aws-sdk-go/aws" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" "google.golang.org/grpc/reflection" - "net/http" - "strconv" - "strings" - "time" ) var ( @@ -25,7 +24,8 @@ var ( func init() { cmdMasterFollower.Run = runMasterFollower // break init cycle mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port") - mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to") + mf.portGrpc = cmdMasterFollower.Flag.Int("port.grpc", 0, "grpc listen port") + mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.") mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") mf.ip = aws.String(util.DetectedHostAddress()) @@ -46,13 +46,13 @@ var cmdMasterFollower = &Command{ Short: "start a master follower", Long: `start a master follower to provide volume=>location mapping service - The master follower does not participate in master election. + The master follower does not participate in master election. It just follow the existing masters, and listen for any volume location changes. In most cases, the master follower is not needed. In big data centers with thousands of volume servers. In theory, the master may have trouble to keep up with the write requests and read requests. - The master follower can relieve the master from from read requests, which only needs to + The master follower can relieve the master from from read requests, which only needs to lookup a fileId or volumeId. The master follower currently can handle fileId lookup requests: @@ -71,6 +71,10 @@ func runMasterFollower(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) util.LoadConfiguration("master", false) + if *mf.portGrpc == 0 { + *mf.portGrpc = 10000 + *mf.port + } + startMasterFollower(mf) return true @@ -79,19 +83,15 @@ func runMasterFollower(cmd *Command, args []string) bool { func startMasterFollower(masterOptions MasterOptions) { // collect settings from main masters - masters := strings.Split(*mf.peers, ",") - masterGrpcAddresses, err := pb.ParseServersToGrpcAddresses(masters) - if err != nil { - glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) - return - } + masters := pb.ServerAddresses(*mf.peers).ToAddressMap() + var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcMasterClients(masterGrpcAddresses, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { - return fmt.Errorf("get master grpc address %v configuration: %v", masterGrpcAddresses, err) + return fmt.Errorf("get master grpc address %v configuration: %v", masters, err) } masterOptions.defaultReplication = &resp.DefaultReplication masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB)) @@ -99,31 +99,35 @@ func startMasterFollower(masterOptions MasterOptions) { return nil }) if err != nil { - glog.V(0).Infof("failed to talk to filer %v: %v", masterGrpcAddresses, err) + glog.V(0).Infof("failed to talk to filer %v: %v", masters, err) glog.V(0).Infof("wait for %d seconds ...", i+1) time.Sleep(time.Duration(i+1) * time.Second) } } if err != nil { - glog.Errorf("failed to talk to filer %v: %v", masterGrpcAddresses, err) + glog.Errorf("failed to talk to filer %v: %v", masters, err) return } option := masterOptions.toMasterOption(nil) option.IsFollower = true + if *masterOptions.ipBind == "" { + *masterOptions.ipBind = *masterOptions.ip + } + r := mux.NewRouter() ms := weed_server.NewMasterServer(r, option, masters) - listeningAddress := *masterOptions.ipBind + ":" + strconv.Itoa(*masterOptions.port) + listeningAddress := util.JoinHostPort(*masterOptions.ipBind, *masterOptions.port) glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) - masterListener, e := util.NewListener(listeningAddress, 0) + masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOptions.ipBind, *masterOptions.port, 0) if e != nil { glog.Fatalf("Master startup error: %v", e) } // starting grpc server - grpcPort := *masterOptions.port + 10000 - grpcL, err := util.NewListener(*masterOptions.ipBind+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *masterOptions.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOptions.ipBind, grpcPort, 0) if err != nil { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } @@ -131,12 +135,18 @@ func startMasterFollower(masterOptions MasterOptions) { master_pb.RegisterSeaweedServer(grpcS, ms) reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } go grpcS.Serve(grpcL) go ms.MasterClient.KeepConnectedToMaster() // start http server httpS := &http.Server{Handler: r} + if masterLocalListner != nil { + go httpS.Serve(masterLocalListner) + } go httpS.Serve(masterListener) select {} diff --git a/weed/command/mount.go b/weed/command/mount.go index aec5fcc3c..0046ca03d 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -11,6 +11,7 @@ type MountOptions struct { dir *string dirAutoCreate *bool collection *string + collectionQuota *int replication *string diskType *string ttlSec *int @@ -26,6 +27,10 @@ type MountOptions struct { uidMap *string gidMap *string readOnly *bool + debug *bool + debugPort *int + localSocket *string + disableXAttr *bool } var ( @@ -42,13 +47,14 @@ func init() { mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") + mountOptions.collectionQuota = cmdMount.Flag.Int("collectionQuotaMB", 0, "quota for the collection") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0") mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") - mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)") + mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 0, "local file chunk cache capacity in MB") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") @@ -57,6 +63,10 @@ func init() { mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>") mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>") mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only") + mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") + mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") + mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-<mount_dir_hash>.sock") + mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") @@ -76,7 +86,7 @@ var cmdMount = &Command{ This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on Linux, and OS X. - On OS X, it requires OSXFUSE (http://osxfuse.github.com/). + On OS X, it requires OSXFUSE (https://osxfuse.github.io/). `, } diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go index f0a5581e7..05d6a1bc4 100644 --- a/weed/command/mount_darwin.go +++ b/weed/command/mount_darwin.go @@ -1,13 +1,5 @@ package command -import ( - "github.com/seaweedfs/fuse" -) - -func osSpecificMountOptions() []fuse.MountOption { - return []fuse.MountOption{} -} - func checkMountPointAvailable(dir string) bool { return true } diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go deleted file mode 100644 index f0a5581e7..000000000 --- a/weed/command/mount_freebsd.go +++ /dev/null @@ -1,13 +0,0 @@ -package command - -import ( - "github.com/seaweedfs/fuse" -) - -func osSpecificMountOptions() []fuse.MountOption { - return []fuse.MountOption{} -} - -func checkMountPointAvailable(dir string) bool { - return true -} diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go index 25c4f72cf..aebb14e61 100644 --- a/weed/command/mount_linux.go +++ b/weed/command/mount_linux.go @@ -6,8 +6,6 @@ import ( "io" "os" "strings" - - "github.com/seaweedfs/fuse" ) const ( @@ -137,10 +135,6 @@ func parseInfoFile(r io.Reader) ([]*Info, error) { return out, nil } -func osSpecificMountOptions() []fuse.MountOption { - return []fuse.MountOption{} -} - func checkMountPointAvailable(dir string) bool { mountPoint := dir if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") { diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go index f3c0de3d6..894c8e313 100644 --- a/weed/command/mount_notsupported.go +++ b/weed/command/mount_notsupported.go @@ -1,6 +1,5 @@ -// +build !linux -// +build !darwin -// +build !freebsd +//go:build !linux && !darwin +// +build !linux,!darwin package command diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index cdf340067..1aff3c5bb 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -1,38 +1,41 @@ -// +build linux darwin freebsd +//go:build linux || darwin +// +build linux darwin package command import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mount" + "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" + "github.com/chrislusf/seaweedfs/weed/mount/unmount" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/hanwen/go-fuse/v2/fuse" + "google.golang.org/grpc/reflection" + "net" + "net/http" "os" "os/user" - "path" - "path/filepath" "runtime" "strconv" "strings" - "syscall" "time" - "github.com/chrislusf/seaweedfs/weed/storage/types" - - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" - - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - - "github.com/chrislusf/seaweedfs/weed/filesys" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" ) func runMount(cmd *Command, args []string) bool { + if *mountOptions.debug { + go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil) + } + grace.SetupProfiling(*mountCpuProfile, *mountMemProfile) if *mountReadRetryTime < time.Second { *mountReadRetryTime = time.Second @@ -52,76 +55,67 @@ func runMount(cmd *Command, args []string) bool { return RunMount(&mountOptions, os.FileMode(umask)) } -func getParentInode(mountDir string) (uint64, error) { - parentDir := filepath.Clean(filepath.Join(mountDir, "..")) - fi, err := os.Stat(parentDir) - if err != nil { - return 0, err - } - - stat, ok := fi.Sys().(*syscall.Stat_t) - if !ok { - return 0, nil - } - - return stat.Ino, nil -} - func RunMount(option *MountOptions, umask os.FileMode) bool { - filers := strings.Split(*option.filer, ",") - // parse filer grpc address - filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers) - if err != nil { - glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) - return true + // basic checks + chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB + if chunkSizeLimitMB <= 0 { + fmt.Printf("Please specify a reasonable buffer size.") + return false } + // try to connect to filer + filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses() util.LoadConfiguration("security", false) - // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool + var err error for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err) + return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err) + glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err) glog.V(0).Infof("wait for %d seconds ...", i+1) time.Sleep(time.Duration(i+1) * time.Second) } } if err != nil { - glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err) + glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err) return true } filerMountRootPath := *option.filerMountRootPath - dir := util.ResolvePath(*option.dir) - parentInode, err := getParentInode(dir) - if err != nil { - glog.Errorf("failed to retrieve inode for parent directory of %s: %v", dir, err) - return true - } - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) + // clean up mount point + dir := util.ResolvePath(*option.dir) if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") return false } - chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB - if chunkSizeLimitMB <= 0 { - fmt.Printf("Please specify a reasonable buffer size.") - return false - } + unmount.Unmount(dir) - fuse.Unmount(dir) + // start on local unix socket + if *option.localSocket == "" { + mountDirHash := util.HashToInt32([]byte(dir)) + if mountDirHash < 0 { + mountDirHash = -mountDirHash + } + *option.localSocket = fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash) + } + if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error()) + } + montSocketListener, err := net.Listen("unix", *option.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err) + } // detect mount folder mode if *option.dirAutoCreate { @@ -129,6 +123,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } fileInfo, err := os.Stat(dir) + // collect uid, gid uid, gid := uint32(0), uint32(0) mountMode := os.ModeDir | 0777 if err == nil { @@ -140,6 +135,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { return false } + // detect uid, gid if uid == 0 { if u, err := user.Current(); err == nil { if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil { @@ -165,34 +161,51 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { return true } - mountName := path.Base(dir) - - options := []fuse.MountOption{ - fuse.VolumeName(mountName), - fuse.FSName(*option.filer + ":" + filerMountRootPath), - fuse.Subtype("seaweedfs"), - // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders - fuse.NoAppleXattr(), - fuse.ExclCreate(), - fuse.DaemonTimeout("3600"), - fuse.AllowSUID(), - fuse.DefaultPermissions(), - fuse.MaxReadahead(1024 * 128), - fuse.AsyncRead(), - fuse.WritebackCache(), - fuse.MaxBackground(128), - fuse.CongestionThreshold(128), - } - - options = append(options, osSpecificMountOptions()...) - if *option.allowOthers { - options = append(options, fuse.AllowOther()) + serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+") + + // mount fuse + fuseMountOptions := &fuse.MountOptions{ + AllowOther: *option.allowOthers, + Options: nil, + MaxBackground: 128, + MaxWrite: 1024 * 1024 * 2, + MaxReadAhead: 1024 * 1024 * 2, + IgnoreSecurityLabels: false, + RememberInodes: false, + FsName: serverFriendlyName + ":" + filerMountRootPath, + Name: "seaweedfs", + SingleThreaded: false, + DisableXAttrs: *option.disableXAttr, + Debug: *option.debug, + EnableLocks: false, + ExplicitDataCacheControl: false, + DirectMount: true, + DirectMountFlags: 0, + //SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability + //EnableAcl: true, } if *option.nonempty { - options = append(options, fuse.AllowNonEmptyMount()) + fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty") } if *option.readOnly { - options = append(options, fuse.ReadOnly()) + if runtime.GOOS == "darwin" { + fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly") + } else { + fuseMountOptions.Options = append(fuseMountOptions.Options, "ro") + } + } + if runtime.GOOS == "darwin" { + // https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options + ioSizeMB := 1 + for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 { + ioSizeMB *= 2 + } + fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600") + fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr") + // fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache") // need to test effectiveness + fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs") + fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName) + fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024)) } // find mount point @@ -201,60 +214,51 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { mountRoot = mountRoot[0 : len(mountRoot)-1] } - diskType := types.ToDiskType(*option.diskType) - - seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ + seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{ MountDirectory: dir, - FilerAddresses: filers, - FilerGrpcAddresses: filerGrpcAddresses, + FilerAddresses: filerAddresses, GrpcDialOption: grpcDialOption, FilerMountRootPath: mountRoot, Collection: *option.collection, Replication: *option.replication, TtlSec: int32(*option.ttlSec), - DiskType: diskType, + DiskType: types.ToDiskType(*option.diskType), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, ConcurrentWriters: *option.concurrentWriters, CacheDir: *option.cacheDir, CacheSizeMB: *option.cacheSizeMB, DataCenter: *option.dataCenter, + Quota: int64(*option.collectionQuota) * 1024 * 1024, MountUid: uid, MountGid: gid, MountMode: mountMode, MountCtime: fileInfo.ModTime(), MountMtime: time.Now(), - MountParentInode: parentInode, Umask: umask, VolumeServerAccess: *mountOptions.volumeServerAccess, Cipher: cipher, UidGidMapper: uidGidMapper, + DisableXAttr: *option.disableXAttr, }) - // mount - c, err := fuse.Mount(dir, options...) + server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions) if err != nil { - glog.V(0).Infof("mount: %v", err) - return true + glog.Fatalf("Mount fail: %v", err) } - defer fuse.Unmount(dir) - grace.OnInterrupt(func() { - fuse.Unmount(dir) - c.Close() + unmount.Unmount(dir) }) - glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir) - server := fs.New(c, nil) - seaweedFileSystem.Server = server + grpcS := pb.NewGrpcServer() + mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem) + reflection.Register(grpcS) + go grpcS.Serve(montSocketListener) + seaweedFileSystem.StartBackgroundTasks() - err = server.Serve(seaweedFileSystem) - // check if the mount process has an error to report - <-c.Ready - if err := c.MountError; err != nil { - glog.V(0).Infof("mount process: %v", err) - return true - } + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) + + server.Serve() return true } diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index db0b4148d..3274f599b 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -3,7 +3,6 @@ package command import ( "context" "fmt" - "strconv" "time" "google.golang.org/grpc/reflection" @@ -63,35 +62,31 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") cipher := false for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) break } } qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ - Filers: []string{*msgBrokerOpt.filer}, + Filers: []pb.ServerAddress{filerAddress}, DefaultReplication: "", MaxMB: 0, Ip: *msgBrokerOpt.ip, @@ -100,7 +95,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { }, grpcDialOption) // start grpc listener - grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) + grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) } diff --git a/weed/command/s3.go b/weed/command/s3.go index c8292a7d5..42e447d90 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -3,11 +3,14 @@ package command import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "google.golang.org/grpc/reflection" "net/http" "time" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/gorilla/mux" @@ -23,26 +26,35 @@ var ( ) type S3Options struct { - filer *string - port *int - config *string - domainName *string - tlsPrivateKey *string - tlsCertificate *string - metricsHttpPort *int - allowEmptyFolder *bool + filer *string + bindIp *string + port *int + portGrpc *int + config *string + domainName *string + tlsPrivateKey *string + tlsCertificate *string + metricsHttpPort *int + allowEmptyFolder *bool + allowDeleteBucketNotEmpty *bool + auditLogConfig *string + localFilerSocket *string } func init() { cmdS3.Run = runS3 // break init cycle s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") + s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") + s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port") s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file") + s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file") s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") - s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders") + s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders") + s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") } var cmdS3 = &Command{ @@ -137,11 +149,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*s3opt.filer) filerBucketsPath := "/buckets" @@ -152,10 +160,10 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } filerBucketsPath = resp.DirBuckets metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec) @@ -163,10 +171,10 @@ func (s3opt *S3Options) startS3Server() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) break } } @@ -175,15 +183,16 @@ func (s3opt *S3Options) startS3Server() bool { router := mux.NewRouter().SkipClean(true) - _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: *s3opt.filer, - Port: *s3opt.port, - FilerGrpcAddress: filerGrpcAddress, - Config: *s3opt.config, - DomainName: *s3opt.domainName, - BucketsPath: filerBucketsPath, - GrpcDialOption: grpcDialOption, - AllowEmptyFolder: *s3opt.allowEmptyFolder, + s3ApiServer, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ + Filer: filerAddress, + Port: *s3opt.port, + Config: *s3opt.config, + DomainName: *s3opt.domainName, + BucketsPath: filerBucketsPath, + GrpcDialOption: grpcDialOption, + AllowEmptyFolder: *s3opt.allowEmptyFolder, + AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, + LocalFilerSocket: s3opt.localFilerSocket, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) @@ -191,19 +200,61 @@ func (s3opt *S3Options) startS3Server() bool { httpS := &http.Server{Handler: router} - listenAddress := fmt.Sprintf(":%d", *s3opt.port) - s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) + if *s3opt.portGrpc == 0 { + *s3opt.portGrpc = 10000 + *s3opt.port + } + if *s3opt.bindIp == "" { + *s3opt.bindIp = "localhost" + } + + listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port) + s3ApiListener, s3ApiLocalListner, err := util.NewIpAndLocalListeners(*s3opt.bindIp, *s3opt.port, time.Duration(10)*time.Second) if err != nil { glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err) } + if len(*s3opt.auditLogConfig) > 0 { + s3err.InitAuditLog(*s3opt.auditLogConfig) + if s3err.Logger != nil { + defer s3err.Logger.Close() + } + } + + // starting grpc server + grpcPort := *s3opt.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*s3opt.bindIp, grpcPort, 0) + if err != nil { + glog.Fatalf("s3 failed to listen on grpc port %d: %v", grpcPort, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.s3")) + s3_pb.RegisterSeaweedS3Server(grpcS, s3ApiServer) + reflection.Register(grpcS) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } + go grpcS.Serve(grpcL) + if *s3opt.tlsPrivateKey != "" { glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port) + if s3ApiLocalListner != nil { + go func() { + if err = httpS.ServeTLS(s3ApiLocalListner, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil { + glog.Fatalf("S3 API Server Fail to serve: %v", err) + } + }() + } if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } } else { glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port) + if s3ApiLocalListner != nil { + go func() { + if err = httpS.Serve(s3ApiLocalListner); err != nil { + glog.Fatalf("S3 API Server Fail to serve: %v", err) + } + }() + } if err = httpS.Serve(s3ApiListener); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 886c0ac5e..fb81f9966 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -2,9 +2,10 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/command/scaffold" - "io/ioutil" + "github.com/chrislusf/seaweedfs/weed/util" "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/command/scaffold" ) func init() { @@ -55,7 +56,7 @@ func runScaffold(cmd *Command, args []string) bool { } if *outputPath != "" { - ioutil.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644) + util.WriteFile(filepath.Join(*outputPath, *config+".toml"), []byte(content), 0644) } else { fmt.Println(content) } diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index c8cb70131..c82de8da0 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -44,7 +44,7 @@ dbFile = "./filer.db" # sqlite db file # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', # name VARCHAR(1000) BINARY COMMENT 'directory or file name', -# directory TEXT COMMENT 'full path to parent directory', +# directory TEXT BINARY COMMENT 'full path to parent directory', # meta LONGBLOB, # PRIMARY KEY (dirhash, name) # ) DEFAULT CHARSET=utf8; @@ -61,15 +61,15 @@ connection_max_lifetime_seconds = 0 interpolateParams = false # if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: enableUpsert = true -upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" +upsertQuery = """INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" [mysql2] # or memsql, tidb enabled = false createTable = """ - CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` ( + CREATE TABLE IF NOT EXISTS `%s` ( dirhash BIGINT, name VARCHAR(1000) BINARY, - directory TEXT, + directory TEXT BINARY, meta LONGBLOB, PRIMARY KEY (dirhash, name) ) DEFAULT CHARSET=utf8; @@ -85,7 +85,7 @@ connection_max_lifetime_seconds = 0 interpolateParams = false # if insert/upsert failing, you can disable upsert or update query syntax to match your RDBMS syntax: enableUpsert = true -upsertQuery = """INSERT INTO ` + "`%s`" + ` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" +upsertQuery = """INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE meta = VALUES(meta)""" [postgres] # or cockroachdb, YugabyteDB # CREATE TABLE IF NOT EXISTS filemeta ( @@ -153,6 +153,8 @@ password = "" superLargeDirectories = [] # Name of the datacenter local to this filer, used as host selection fallback. localDC = "" +# Gocql connection timeout, default: 600ms +connection_timeout_millisecond = 600 [hbase] enabled = false @@ -167,6 +169,14 @@ database = 0 # This changes the data layout. Only add new directories. Removing/Updating will cause data loss. superLargeDirectories = [] +[redis2_sentinel] +enabled = false +addresses = ["172.22.12.7:26379","172.22.12.8:26379","172.22.12.9:26379"] +masterName = "master" +username = "" +password = "" +database = 0 + [redis_cluster2] enabled = false addresses = [ @@ -185,6 +195,70 @@ routeByLatency = false # This changes the data layout. Only add new directories. Removing/Updating will cause data loss. superLargeDirectories = [] +[redis_lua] +enabled = false +address = "localhost:6379" +password = "" +database = 0 +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[redis_lua_sentinel] +enabled = false +addresses = ["172.22.12.7:26379","172.22.12.8:26379","172.22.12.9:26379"] +masterName = "master" +username = "" +password = "" +database = 0 + +[redis_lua_cluster] +enabled = false +addresses = [ + "localhost:30001", + "localhost:30002", + "localhost:30003", + "localhost:30004", + "localhost:30005", + "localhost:30006", +] +password = "" +# allows reads from slave servers or the master, but all writes still go to the master +readOnly = false +# automatically use the closest Redis server for reads +routeByLatency = false +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[redis3] # beta +enabled = false +address = "localhost:6379" +password = "" +database = 0 + +[redis3_sentinel] +enabled = false +addresses = ["172.22.12.7:26379","172.22.12.8:26379","172.22.12.9:26379"] +masterName = "master" +username = "" +password = "" +database = 0 + +[redis_cluster3] # beta +enabled = false +addresses = [ + "localhost:30001", + "localhost:30002", + "localhost:30003", + "localhost:30004", + "localhost:30005", + "localhost:30006", +] +password = "" +# allows reads from slave servers or the master, but all writes still go to the master +readOnly = false +# automatically use the closest Redis server for reads +routeByLatency = false + [etcd] enabled = false servers = "localhost:2379" @@ -211,6 +285,29 @@ healthcheck_enabled = false index.max_result_window = 10000 +[arangodb] # in development dont use it +enabled = false +db_name = "seaweedfs" +servers=["http://localhost:8529"] # list of servers to connect to +# only basic auth supported for now +username="" +password="" +# skip tls cert validation +insecure_skip_verify = true + +[ydb] # https://ydb.tech/ +enabled = false +dsn = "grpc://localhost:2136?database=/local" +prefix = "seaweedfs" +useBucketPrefix = true # Fast Bucket Deletion +poolSizeLimit = 50 +dialTimeOut = 10 + +# Authenticate produced with one of next environment variables: +# YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=<path/to/sa_key_file> — used service account key file by path +# YDB_ANONYMOUS_CREDENTIALS="1" — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation +# YDB_METADATA_CREDENTIALS="1" — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function +# YDB_ACCESS_TOKEN_CREDENTIALS=<access_token> — used for authenticate to YDB with short-life access token. For example, access token may be IAM token ########################## ########################## @@ -238,3 +335,5 @@ enabled = false pdaddrs = "localhost:2379" # Concurrency for TiKV delete range deleterange_concurrency = 1 +# Enable 1PC +enable_1pc = false diff --git a/weed/command/scaffold/master.toml b/weed/command/scaffold/master.toml index 020f48e36..10d9d1914 100644 --- a/weed/command/scaffold/master.toml +++ b/weed/command/scaffold/master.toml @@ -14,19 +14,14 @@ scripts = """ volume.deleteEmpty -quietFor=24h -force volume.balance -force volume.fix.replication + s3.clean.uploads -timeAgo=24h unlock """ sleep_minutes = 17 # sleep minutes between each script execution -[master.filer] -default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands - [master.sequencer] -type = "raft" # Choose [raft|etcd|snowflake] type for storing the file id sequence -# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence -# example : http://127.0.0.1:2379,http://127.0.0.1:2389 -sequencer_etcd_urls = "http://127.0.0.1:2379" +type = "raft" # Choose [raft|snowflake] type for storing the file id sequence # when sequencer.type = snowflake, the snowflake id must be different from other masters sequencer_snowflake_id = 0 # any number between 1~1023 @@ -41,6 +36,7 @@ aws_secret_access_key = "" # if empty, loads from the shared credentials fil region = "us-east-2" bucket = "your_bucket_name" # an existing bucket endpoint = "" +storage_class = "STANDARD_IA" # create this number of logical volumes if no more writable volumes # count_x means how many copies of data. diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml index 0c69b2f24..e5452cdff 100644 --- a/weed/command/scaffold/security.toml +++ b/weed/command/scaffold/security.toml @@ -4,17 +4,46 @@ # /etc/seaweedfs/security.toml # this file is read by master, volume server, and filer -# the jwt signing key is read by master and volume server. -# a jwt defaults to expire after 10 seconds. +# this jwt signing key is read by master and volume server, and it is used for write operations: +# - the Master server generates the JWT, which can be used to write a certain file on a volume server +# - the Volume server validates the JWT on writing +# the jwt defaults to expire after 10 seconds. [jwt.signing] key = "" expires_after_seconds = 10 # seconds -# jwt for read is only supported with master+volume setup. Filer does not support this mode. +# by default, if the signing key above is set, the Volume UI over HTTP is disabled. +# by setting ui.access to true, you can re-enable the Volume UI. Despite +# some information leakage (as the UI is not authenticated), this should not +# pose a security risk. +[access] +ui = false + +# this jwt signing key is read by master and volume server, and it is used for read operations: +# - the Master server generates the JWT, which can be used to read a certain file on a volume server +# - the Volume server validates the JWT on reading +# NOTE: jwt for read is only supported with master+volume setup. Filer does not support this mode. [jwt.signing.read] key = "" expires_after_seconds = 10 # seconds + +# If this JWT key is configured, Filer only accepts writes over HTTP if they are signed with this JWT: +# - f.e. the S3 API Shim generates the JWT +# - the Filer server validates the JWT on writing +# the jwt defaults to expire after 10 seconds. +[jwt.filer_signing] +key = "" +expires_after_seconds = 10 # seconds + +# If this JWT key is configured, Filer only accepts reads over HTTP if they are signed with this JWT: +# - f.e. the S3 API Shim generates the JWT +# - the Filer server validates the JWT on writing +# the jwt defaults to expire after 10 seconds. +[jwt.filer_signing.read] +key = "" +expires_after_seconds = 10 # seconds + # all grpc tls authentications are mutual # the values for the following ca, cert, and key are paths to the PERM files. # the host name is not checked, so the PERM files can be shared. @@ -38,6 +67,11 @@ cert = "" key = "" allowed_commonNames = "" # comma-separated SSL certificate common names +[grpc.s3] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + [grpc.msg_broker] cert = "" key = "" @@ -54,7 +88,13 @@ key = "" # this does not work with other clients, e.g., "weed filer|mount" etc, yet. [https.client] enabled = true + [https.volume] cert = "" key = "" +ca = "" +[https.master] +cert = "" +key = "" +ca = "" diff --git a/weed/command/server.go b/weed/command/server.go index c784d90b9..b1812bb9b 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,7 +2,6 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util/grace" "net/http" "os" "strings" @@ -11,7 +10,9 @@ import ( stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" ) type ServerOptions struct { @@ -27,6 +28,7 @@ var ( masterOptions MasterOptions filerOptions FilerOptions s3Options S3Options + iamOptions IamOptions webdavOptions WebDavOption msgBrokerOptions MessageBrokerOptions ) @@ -53,14 +55,14 @@ var cmdServer = &Command{ var ( serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") - serverBindIp = cmdServer.Flag.String("ip.bind", "", "ip address to bind to") + serverBindIp = cmdServer.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds") serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") serverDisableHttp = cmdServer.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.") volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.") + volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured as free disk space divided by volume size.") volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly (deprecated, use minFreeSpace instead).") volumeMinFreeSpace = cmdServer.Flag.String("volume.minFreeSpace", "", "min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). Low disk space will mark all volumes as ReadOnly.") serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") @@ -70,6 +72,7 @@ var ( isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") + isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") @@ -85,6 +88,7 @@ func init() { serverOptions.debugPort = cmdServer.Flag.Int("debug.port", 6060, "http port for debugging") masterOptions.port = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") + masterOptions.portGrpc = cmdServer.Flag.Int("master.port.grpc", 0, "master server grpc listen port") masterOptions.metaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") masterOptions.peers = cmdServer.Flag.String("master.peers", "", "all master nodes in comma separated ip:masterPort list") masterOptions.volumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") @@ -94,20 +98,26 @@ func init() { masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address") masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server") + masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") + masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") + filerOptions.filerGroup = cmdServer.Flag.String("filer.filerGroup", "", "share metadata with other filers in the same filerGroup") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") + filerOptions.portGrpc = cmdServer.Flag.Int("filer.port.grpc", 0, "filer server grpc listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 4, "split files larger than the limit") filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") - filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list") filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") + filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") + filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") + serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") @@ -122,13 +132,19 @@ func init() { serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port") + serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") + s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") - s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") + s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") + s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") + s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") + + iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port") webdavOptions.port = cmdServer.Flag.Int("webdav.port", 7333, "webdav server http listen port") webdavOptions.collection = cmdServer.Flag.String("webdav.collection", "", "collection to create the files") @@ -137,7 +153,7 @@ func init() { webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file") webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") - webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB") + webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") @@ -157,6 +173,9 @@ func runServer(cmd *Command, args []string) bool { if *isStartingS3 { *isStartingFiler = true } + if *isStartingIam { + *isStartingFiler = true + } if *isStartingWebDav { *isStartingFiler = true } @@ -165,20 +184,27 @@ func runServer(cmd *Command, args []string) bool { } if *isStartingMasterServer { - _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) - peers := strings.Join(peerList, ",") + _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers) + peers := strings.Join(pb.ToAddressStrings(peerList), ",") masterOptions.peers = &peers } + if *serverBindIp == "" { + serverBindIp = serverIp + } + // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = masterOptions.peers + filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap() filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp + s3Options.bindIp = serverBindIp + iamOptions.ip = serverBindIp + iamOptions.masters = masterOptions.peers serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp - serverOptions.v.masters = masterOptions.peers + serverOptions.v.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses() serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack @@ -194,8 +220,9 @@ func runServer(cmd *Command, args []string) bool { filerOptions.disableHttp = serverDisableHttp masterOptions.disableHttp = serverDisableHttp - filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port) + filerAddress := string(pb.NewServerAddress(*serverIp, *filerOptions.port, *filerOptions.portGrpc)) s3Options.filer = &filerAddress + iamOptions.filer = &filerAddress webdavOptions.filer = &filerAddress msgBrokerOptions.filer = &filerAddress @@ -222,25 +249,28 @@ func runServer(cmd *Command, args []string) bool { if *isStartingFiler { go func() { time.Sleep(1 * time.Second) - filerOptions.startFiler() - }() } if *isStartingS3 { go func() { time.Sleep(2 * time.Second) - + s3Options.localFilerSocket = filerOptions.localSocket s3Options.startS3Server() + }() + } + if *isStartingIam { + go func() { + time.Sleep(2 * time.Second) + iamOptions.startIamServer() }() } if *isStartingWebDav { go func() { time.Sleep(2 * time.Second) - webdavOptions.startWebDav() }() diff --git a/weed/command/shell.go b/weed/command/shell.go index 4a9f4b027..c32a8e614 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/shell" @@ -17,6 +18,7 @@ var ( func init() { cmdShell.Run = runShell // break init cycle shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333") + shellOptions.FilerGroup = cmdShell.Flag.String("filerGroup", "", "filerGroup for the filers") shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888") shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml") } @@ -36,7 +38,7 @@ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - if *shellOptions.Masters == "" && *shellInitialFiler == "" { + if *shellOptions.Masters == "" { util.LoadConfiguration("shell", false) v := util.GetViper() cluster := v.GetString("cluster.default") @@ -44,22 +46,15 @@ func runShell(command *Command, args []string) bool { cluster = *shellCluster } if cluster == "" { - *shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888" + *shellOptions.Masters = "localhost:9333" } else { *shellOptions.Masters = v.GetString("cluster." + cluster + ".master") *shellInitialFiler = v.GetString("cluster." + cluster + ".filer") + fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) } } - fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) - - var err error - shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) - shellOptions.FilerAddress = *shellInitialFiler - if err != nil { - fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err) - return false - } + shellOptions.FilerAddress = pb.ServerAddress(*shellInitialFiler) shellOptions.Directory = "/" shell.RunShell(shellOptions) diff --git a/weed/command/update.go b/weed/command/update.go new file mode 100644 index 000000000..2d0dc42ad --- /dev/null +++ b/weed/command/update.go @@ -0,0 +1,382 @@ +package command + +import ( + "archive/tar" + "archive/zip" + "bytes" + "compress/gzip" + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "golang.org/x/net/context/ctxhttp" +) + +//copied from https://github.com/restic/restic/tree/master/internal/selfupdate + +// Release collects data about a single release on GitHub. +type Release struct { + Name string `json:"name"` + TagName string `json:"tag_name"` + Draft bool `json:"draft"` + PreRelease bool `json:"prerelease"` + PublishedAt time.Time `json:"published_at"` + Assets []Asset `json:"assets"` + + Version string `json:"-"` // set manually in the code +} + +// Asset is a file uploaded and attached to a release. +type Asset struct { + ID int `json:"id"` + Name string `json:"name"` + URL string `json:"url"` +} + +const githubAPITimeout = 30 * time.Second + +// githubError is returned by the GitHub API, e.g. for rate-limiting. +type githubError struct { + Message string +} + +//default version is not full version +var isFullVersion = false + +var ( + updateOpt UpdateOptions +) + +type UpdateOptions struct { + dir *string + name *string + Version *string +} + +func init() { + path, _ := os.Executable() + _, name := filepath.Split(path) + updateOpt.dir = cmdUpdate.Flag.String("dir", filepath.Dir(path), "directory to save new weed.") + updateOpt.name = cmdUpdate.Flag.String("name", name, "name of new weed. On windows, name shouldn't be same to the orignial name.") + updateOpt.Version = cmdUpdate.Flag.String("version", "0", "specific version of weed you want to download. If not specified, get the latest version.") + cmdUpdate.Run = runUpdate +} + +var cmdUpdate = &Command{ + UsageLine: "update [-dir=/path/to/dir] [-name=name] [-version=x.xx]", + Short: "get latest or specific version from https://github.com/chrislusf/seaweedfs", + Long: `get latest or specific version from https://github.com/chrislusf/seaweedfs`, +} + +func runUpdate(cmd *Command, args []string) bool { + path, _ := os.Executable() + _, name := filepath.Split(path) + + if *updateOpt.dir != "" { + if err := util.TestFolderWritable(util.ResolvePath(*updateOpt.dir)); err != nil { + glog.Fatalf("Check Folder(-dir) Writable %s : %s", *updateOpt.dir, err) + return false + } + } else { + *updateOpt.dir = filepath.Dir(path) + } + + if *updateOpt.name == "" { + *updateOpt.name = name + } + + target := filepath.Join(*updateOpt.dir, *updateOpt.name) + + if runtime.GOOS == "windows" { + if target == path { + glog.Fatalf("On windows, name of the new weed shouldn't be same to the orignial name.") + return false + } + } + + glog.V(0).Infof("new weed will be saved to %s", target) + + _, err := downloadRelease(context.Background(), target, *updateOpt.Version) + if err != nil { + glog.Errorf("unable to download weed: %v", err) + return false + } + return true +} + +func downloadRelease(ctx context.Context, target string, ver string) (version string, err error) { + currentVersion := util.VERSION_NUMBER + rel, err := GitHubLatestRelease(ctx, ver, "chrislusf", "seaweedfs") + if err != nil { + return "", err + } + + if rel.Version == currentVersion { + if ver == "0" { + glog.V(0).Infof("weed is up to date") + } else { + glog.V(0).Infof("no need to download the same version of weed ") + } + return currentVersion, nil + } + + glog.V(0).Infof("download version: %s", rel.Version) + + largeDiskSuffix := "" + if util.VolumeSizeLimitGB == 8000 { + largeDiskSuffix = "_large_disk" + } + + fullSuffix := "" + if isFullVersion { + fullSuffix = "_full" + } + + ext := "tar.gz" + if runtime.GOOS == "windows" { + ext = "zip" + } + + suffix := fmt.Sprintf("%s_%s%s%s.%s", runtime.GOOS, runtime.GOARCH, fullSuffix, largeDiskSuffix, ext) + md5Filename := fmt.Sprintf("%s.md5", suffix) + _, md5Val, err := getGithubDataFile(ctx, rel.Assets, md5Filename) + if err != nil { + return "", err + } + + downloadFilename, buf, err := getGithubDataFile(ctx, rel.Assets, suffix) + if err != nil { + return "", err + } + + md5Ctx := md5.New() + md5Ctx.Write(buf) + binaryMd5 := md5Ctx.Sum(nil) + if hex.EncodeToString(binaryMd5) != string(md5Val[0:32]) { + glog.Errorf("md5:'%s' '%s'", hex.EncodeToString(binaryMd5), string(md5Val[0:32])) + err = fmt.Errorf("binary md5sum doesn't match") + return "", err + } + + err = extractToFile(buf, downloadFilename, target) + if err != nil { + return "", err + } else { + glog.V(0).Infof("successfully updated weed to version %v\n", rel.Version) + } + + return rel.Version, nil +} + +// GitHubLatestRelease uses the GitHub API to get information about the specific +// release of a repository. +func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (Release, error) { + ctx, cancel := context.WithTimeout(ctx, githubAPITimeout) + defer cancel() + + url := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases", owner, repo) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return Release{}, err + } + + // pin API version 3 + req.Header.Set("Accept", "application/vnd.github.v3+json") + + res, err := ctxhttp.Do(ctx, http.DefaultClient, req) + if err != nil { + return Release{}, err + } + + if res.StatusCode != http.StatusOK { + content := res.Header.Get("Content-Type") + if strings.Contains(content, "application/json") { + // try to decode error message + var msg githubError + jerr := json.NewDecoder(res.Body).Decode(&msg) + if jerr == nil { + return Release{}, fmt.Errorf("unexpected status %v (%v) returned, message:\n %v", res.StatusCode, res.Status, msg.Message) + } + } + + _ = res.Body.Close() + return Release{}, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status) + } + + buf, err := ioutil.ReadAll(res.Body) + if err != nil { + _ = res.Body.Close() + return Release{}, err + } + + err = res.Body.Close() + if err != nil { + return Release{}, err + } + + var release Release + var releaseList []Release + err = json.Unmarshal(buf, &releaseList) + if err != nil { + return Release{}, err + } + if ver == "0" { + release = releaseList[0] + glog.V(0).Infof("latest version is %v\n", release.TagName) + } else { + for _, r := range releaseList { + if r.TagName == ver { + release = r + break + } + } + } + + if release.TagName == "" { + return Release{}, fmt.Errorf("can not find the specific version") + } + + release.Version = release.TagName + return release, nil +} + +func getGithubData(ctx context.Context, url string) ([]byte, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + // request binary data + req.Header.Set("Accept", "application/octet-stream") + + res, err := ctxhttp.Do(ctx, http.DefaultClient, req) + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status) + } + + buf, err := ioutil.ReadAll(res.Body) + if err != nil { + _ = res.Body.Close() + return nil, err + } + + err = res.Body.Close() + if err != nil { + return nil, err + } + + return buf, nil +} + +func getGithubDataFile(ctx context.Context, assets []Asset, suffix string) (filename string, data []byte, err error) { + var url string + for _, a := range assets { + if strings.HasSuffix(a.Name, suffix) { + url = a.URL + filename = a.Name + break + } + } + + if url == "" { + return "", nil, fmt.Errorf("unable to find file with suffix %v", suffix) + } + + glog.V(0).Infof("download %v\n", filename) + data, err = getGithubData(ctx, url) + if err != nil { + return "", nil, err + } + + return filename, data, nil +} + +func extractToFile(buf []byte, filename, target string) error { + var rd io.Reader = bytes.NewReader(buf) + + switch filepath.Ext(filename) { + case ".gz": + gr, err := gzip.NewReader(rd) + if err != nil { + return err + } + defer gr.Close() + trd := tar.NewReader(gr) + hdr, terr := trd.Next() + if terr != nil { + glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr) + return terr + } + rd = trd + case ".zip": + zrd, err := zip.NewReader(bytes.NewReader(buf), int64(len(buf))) + if err != nil { + return err + } + + if len(zrd.File) != 1 { + return fmt.Errorf("ZIP archive contains more than one file") + } + + file, err := zrd.File[0].Open() + if err != nil { + return err + } + + defer func() { + _ = file.Close() + }() + + rd = file + } + + // Write everything to a temp file + dir := filepath.Dir(target) + new, err := ioutil.TempFile(dir, "weed") + if err != nil { + return err + } + + n, err := io.Copy(new, rd) + if err != nil { + _ = new.Close() + _ = os.Remove(new.Name()) + return err + } + if err = new.Sync(); err != nil { + return err + } + if err = new.Close(); err != nil { + return err + } + + mode := os.FileMode(0755) + // attempt to find the original mode + if fi, err := os.Lstat(target); err == nil { + mode = fi.Mode() + } + + // Rename the temp file to the final location atomically. + if err := os.Rename(new.Name(), target); err != nil { + return err + } + + glog.V(0).Infof("saved %d bytes in %v\n", n, target) + return os.Chmod(target, mode) +} diff --git a/weed/command/update_full.go b/weed/command/update_full.go new file mode 100644 index 000000000..529f38219 --- /dev/null +++ b/weed/command/update_full.go @@ -0,0 +1,9 @@ +//go:build elastic && ydb && gocdk && hdfs +// +build elastic,ydb,gocdk,hdfs + +package command + +//set true if gtags are set +func init() { + isFullVersion = true +} diff --git a/weed/command/upload.go b/weed/command/upload.go index 9ae1befab..f2b0b7fe4 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -71,7 +71,7 @@ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master) + defaultReplication, err := readMasterConfiguration(grpcDialOption, pb.ServerAddress(*upload.master)) if err != nil { fmt.Printf("upload: %v", err) return false @@ -96,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -118,7 +118,7 @@ func runUpload(cmd *Command, args []string) bool { fmt.Println(e.Error()) return false } - results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) if err != nil { fmt.Println(err.Error()) return false @@ -129,8 +129,8 @@ func runUpload(cmd *Command, args []string) bool { return true } -func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) { - err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { +func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/command/volume.go b/weed/command/volume.go index 235eff11b..158bdf162 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -2,7 +2,6 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" httppprof "net/http/pprof" "os" @@ -11,6 +10,8 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/spf13/viper" "google.golang.org/grpc" @@ -24,7 +25,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" @@ -36,6 +37,7 @@ var ( type VolumeServerOptions struct { port *int + portGrpc *int publicPort *int folders []string folderMaxLimits []int @@ -43,7 +45,8 @@ type VolumeServerOptions struct { ip *string publicUrl *string bindIp *string - masters *string + mastersString *string + masters []pb.ServerAddress idleConnectionTimeout *int dataCenter *string rack *string @@ -62,17 +65,19 @@ type VolumeServerOptions struct { preStopSeconds *int metricsHttpPort *int // pulseSeconds *int - enableTcp *bool + enableTcp *bool + inflightUploadDataTimeout *time.Duration } func init() { cmdVolume.Run = runVolume // break init cycle v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") + v.portGrpc = cmdVolume.Flag.Int("port.grpc", 0, "grpc listen port") v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public") v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") - v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to") - v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") + v.mastersString = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") @@ -91,7 +96,8 @@ func init() { v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") - v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port") + v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port") + v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") } var cmdVolume = &Command{ @@ -104,7 +110,7 @@ var cmdVolume = &Command{ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.") + maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured as free disk space divided by volume size.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly (deprecated, use minFreeSpace instead).") minFreeSpace = cmdVolume.Flag.String("minFreeSpace", "", "min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). Low disk space will mark all volumes as ReadOnly.") @@ -123,6 +129,7 @@ func runVolume(cmd *Command, args []string) bool { go stats_collect.StartMetricsServer(*v.metricsHttpPort) minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent) + v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses() v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, minFreeSpaces) return true @@ -189,12 +196,18 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.ip = util.DetectedHostAddress() glog.V(0).Infof("detected volume server ip address: %v", *v.ip) } + if *v.bindIp == "" { + *v.bindIp = *v.ip + } if *v.publicPort == 0 { *v.publicPort = *v.port } + if *v.portGrpc == 0 { + *v.portGrpc = 10000 + *v.port + } if *v.publicUrl == "" { - *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) + *v.publicUrl = util.JoinHostPort(*v.ip, *v.publicPort) } volumeMux := http.NewServeMux() @@ -221,20 +234,19 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind = storage.NeedleMapLevelDbLarge } - masters := *v.masters - volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, - *v.ip, *v.port, *v.publicUrl, + *v.ip, *v.port, *v.portGrpc, *v.publicUrl, v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes, *v.idxFolder, volumeNeedleMapKind, - strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, + v.masters, 5, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentDownloadLimitMB)*1024*1024, + *v.inflightUploadDataTimeout, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -258,7 +270,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v stopChan := make(chan bool) grace.OnInterrupt(func() { - fmt.Println("volume server has be killed") + fmt.Println("volume server has been killed") // Stop heartbeats if !volumeServer.StopHeartbeat() { @@ -307,8 +319,8 @@ func (v VolumeServerOptions) isSeparatedPublicPort() bool { } func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server { - grpcPort := *v.port + 10000 - grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *v.portGrpc + grpcL, err := util.NewListener(util.JoinHostPort(*v.bindIp, grpcPort), 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } @@ -324,7 +336,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe } func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { - publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) + publicListeningAddress := util.JoinHostPort(*v.bindIp, *v.publicPort) glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { @@ -351,7 +363,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd keyFile = viper.GetString("https.volume.key") } - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) + listeningAddress := util.JoinHostPort(*v.bindIp, *v.port) glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress) listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { @@ -359,11 +371,18 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd } httpDown := httpdown.HTTP{ - KillTimeout: 5 * time.Minute, - StopTimeout: 5 * time.Minute, + KillTimeout: time.Minute, + StopTimeout: 30 * time.Second, CertFile: certFile, KeyFile: keyFile} - clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener) + httpS := &http.Server{Handler: handler} + + if viper.GetString("https.volume.ca") != "" { + clientCertFile := viper.GetString("https.volume.ca") + httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile) + } + + clusterHttpServer := httpDown.Serve(httpS, listener) go func() { if e := clusterHttpServer.Wait(); e != nil { glog.Fatalf("Volume server fail to serve: %v", e) @@ -373,7 +392,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd } func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000) + listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000) glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) listener, e := util.NewListener(listeningAddress, 0) if e != nil { diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 781ea1e36..689bf3c30 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -43,7 +43,7 @@ func init() { webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") - webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB") + webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 0, "local cache capacity in MB") } var cmdWebDav = &Command{ @@ -78,46 +78,41 @@ func (wo *WebDavOption) startWebDav() bool { } // parse filer grpc address - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*wo.filer) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool // connect to filer for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) break } } ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ - Filer: *wo.filer, - FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: grpcDialOption, - Collection: *wo.collection, - Replication: *wo.replication, - DiskType: *wo.disk, - Uid: uid, - Gid: gid, - Cipher: cipher, - CacheDir: util.ResolvePath(*wo.cacheDir), - CacheSizeMB: *wo.cacheSizeMB, + Filer: filerAddress, + GrpcDialOption: grpcDialOption, + Collection: *wo.collection, + Replication: *wo.replication, + DiskType: *wo.disk, + Uid: uid, + Gid: gid, + Cipher: cipher, + CacheDir: util.ResolvePath(*wo.cacheDir), + CacheSizeMB: *wo.cacheSizeMB, }) if webdavServer_err != nil { glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) |
