diff options
Diffstat (limited to 'weed/command')
29 files changed, 248 insertions, 82 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index a8be4838e..f9b9fba64 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -66,7 +66,7 @@ var cmdBackup = &Command{ func runBackup(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") if *s.volumeId == -1 { @@ -138,7 +138,9 @@ func runBackup(cmd *Command, args []string) bool { if datSize > stats.TailOffset { // remove the old data - v.Destroy(false) + if err := v.Destroy(false); err != nil { + fmt.Printf("Error destroying volume: %v\n", err) + } // recreate an empty volume v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0) if err != nil { diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 2a0db47c2..bc7ee1292 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -22,6 +22,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) type BenchmarkOptions struct { @@ -111,7 +112,7 @@ var ( func runBenchmark(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) @@ -214,7 +215,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if isSecure { jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid) } - if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { + if e := util_http.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { s.completed++ } else { s.failed++ @@ -295,7 +296,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { } var bytes []byte for _, url := range urls { - bytes, _, err = util.Get(url) + bytes, _, err = util_http.Get(url) if err == nil { break } diff --git a/weed/command/download.go b/weed/command/download.go index 060be9f14..1b7098824 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -47,7 +48,7 @@ var cmdDownload = &Command{ } func runDownload(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for _, fid := range args { @@ -63,11 +64,11 @@ func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpti if lookupError != nil { return lookupError } - filename, _, rc, err := util.DownloadFile(fileUrl, jwt) + filename, _, rc, err := util_http.DownloadFile(fileUrl, jwt) if err != nil { return err } - defer util.CloseResponse(rc) + defer util_http.CloseResponse(rc) if filename == "" { filename = fileId } @@ -116,10 +117,10 @@ func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption return "", nil, lookupError } var rc *http.Response - if filename, _, rc, e = util.DownloadFile(fileUrl, jwt); e != nil { + if filename, _, rc, e = util_http.DownloadFile(fileUrl, jwt); e != nil { return "", nil, e } - defer util.CloseResponse(rc) + defer util_http.CloseResponse(rc) content, e = io.ReadAll(rc.Body) return } diff --git a/weed/command/filer.go b/weed/command/filer.go index 877c4b5d5..b7f67ea3b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -1,6 +1,9 @@ package command import ( + "context" + "crypto/tls" + "crypto/x509" "fmt" "net" "net/http" @@ -10,8 +13,6 @@ import ( "strings" "time" - "google.golang.org/grpc/reflection" - "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -20,6 +21,10 @@ import ( weed_server "github.com/seaweedfs/seaweedfs/weed/server" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/spf13/viper" + "google.golang.org/grpc/credentials/tls/certprovider" + "google.golang.org/grpc/credentials/tls/certprovider/pemfile" + "google.golang.org/grpc/reflection" ) var ( @@ -52,6 +57,7 @@ type FilerOptions struct { disableHttp *bool cipher *bool metricsHttpPort *int + metricsHttpIp *string saveToFilerLimit *int defaultLevelDbDirectory *string concurrentUploadLimitMB *int @@ -63,7 +69,7 @@ type FilerOptions struct { diskType *string allowedOrigins *string exposeDirectoryData *bool - joinExistingFiler *bool + certProvider certprovider.Provider } func init() { @@ -85,6 +91,7 @@ func init() { f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + f.metricsHttpIp = cmdFiler.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") @@ -96,7 +103,6 @@ func init() { f.diskType = cmdFiler.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") f.allowedOrigins = cmdFiler.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins") f.exposeDirectoryData = cmdFiler.Flag.Bool("exposeDirectoryData", true, "whether to return directory metadata and content in Filer UI") - f.joinExistingFiler = cmdFiler.Flag.Bool("joinExistingFiler", false, "enable if new filer wants to join existing cluster") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -124,6 +130,7 @@ func init() { filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") + filerWebDavOptions.maxMB = cmdFiler.Flag.Int("webdav.maxMB", 4, "split files larger than the limit") filerWebDavOptions.filerRootPath = cmdFiler.Flag.String("webdav.filer.path", "/", "use this remote path from filer server") // start iam on filer @@ -172,9 +179,17 @@ func runFiler(cmd *Command, args []string) bool { go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil) } - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() - go stats_collect.StartMetricsServer(*f.bindIp, *f.metricsHttpPort) + switch { + case *f.metricsHttpIp != "": + // noting to do, use f.metricsHttpIp + case *f.bindIp != "": + *f.metricsHttpIp = *f.bindIp + case *f.ip != "": + *f.metricsHttpIp = *f.ip + } + go stats_collect.StartMetricsServer(*f.metricsHttpIp, *f.metricsHttpPort) filerAddress := pb.NewServerAddress(*f.ip, *f.port, *f.portGrpc).String() startDelay := time.Duration(2) @@ -222,6 +237,15 @@ func runFiler(cmd *Command, args []string) bool { return true } +// GetCertificateWithUpdate Auto refreshing TSL certificate +func (fo *FilerOptions) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) { + certs, err := fo.certProvider.KeyMaterial(context.Background()) + if certs == nil { + return nil, err + } + return &certs.Certs[0], err +} + func (fo *FilerOptions) startFiler() { defaultMux := http.NewServeMux() @@ -264,7 +288,6 @@ func (fo *FilerOptions) startFiler() { DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, DiskType: *fo.diskType, AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), - JoinExistingFiler: *fo.joinExistingFiler, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) @@ -332,15 +355,62 @@ func (fo *FilerOptions) startFiler() { httpS.Serve(filerSocketListener) }() } - if filerLocalListener != nil { - go func() { - if err := httpS.Serve(filerLocalListener); err != nil { - glog.Errorf("Filer Fail to serve: %v", e) + + if viper.GetString("https.filer.key") != "" { + certFile := viper.GetString("https.filer.cert") + keyFile := viper.GetString("https.filer.key") + caCertFile := viper.GetString("https.filer.ca") + disbaleTlsVerifyClientCert := viper.GetBool("https.filer.disable_tls_verify_client_cert") + + pemfileOptions := pemfile.Options{ + CertFile: certFile, + KeyFile: keyFile, + RefreshDuration: security.CredRefreshingInterval, + } + if fo.certProvider, err = pemfile.NewProvider(pemfileOptions); err != nil { + glog.Fatalf("pemfile.NewProvider(%v) failed: %v", pemfileOptions, err) + } + + caCertPool := x509.NewCertPool() + if caCertFile != "" { + caCertFile, err := os.ReadFile(caCertFile) + if err != nil { + glog.Fatalf("error reading CA certificate: %v", err) } - }() - } - if err := httpS.Serve(filerListener); err != nil { - glog.Fatalf("Filer Fail to serve: %v", e) - } + caCertPool.AppendCertsFromPEM(caCertFile) + } + + clientAuth := tls.NoClientCert + if !disbaleTlsVerifyClientCert { + clientAuth = tls.RequireAndVerifyClientCert + } + + httpS.TLSConfig = &tls.Config{ + GetCertificate: fo.GetCertificateWithUpdate, + ClientAuth: clientAuth, + ClientCAs: caCertPool, + } + if filerLocalListener != nil { + go func() { + if err := httpS.ServeTLS(filerLocalListener, "", ""); err != nil { + glog.Errorf("Filer Fail to serve: %v", e) + } + }() + } + if err := httpS.ServeTLS(filerListener, "", ""); err != nil { + glog.Fatalf("Filer Fail to serve: %v", e) + } + } else { + if filerLocalListener != nil { + go func() { + if err := httpS.Serve(filerLocalListener); err != nil { + glog.Errorf("Filer Fail to serve: %v", e) + } + }() + } + if err := httpS.Serve(filerListener); err != nil { + glog.Fatalf("Filer Fail to serve: %v", e) + } + } } diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 4aeab60f2..1344dfd2c 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "regexp" + "strings" "time" ) @@ -58,7 +59,7 @@ var cmdFilerBackup = &Command{ func runFilerBackup(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() util.LoadConfiguration("replication", true) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") @@ -148,17 +149,22 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti }() } + prefix := sourcePath + if !strings.HasSuffix(prefix, "/") { + prefix = prefix + "/" + } + metadataFollowOption := &pb.MetadataFollowOption{ ClientName: "backup_" + dataSink.GetName(), ClientId: clientId, ClientEpoch: clientEpoch, SelfSignature: 0, - PathPrefix: sourcePath, + PathPrefix: prefix, AdditionalPathPrefixes: nil, DirectoriesToWatch: nil, StartTsNs: startFrom.UnixNano(), StopTsNs: 0, - EventErrorType: pb.TrivialOnError, + EventErrorType: pb.RetryForeverOnError, } return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 2ef3bfc33..ba3625b0d 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -59,7 +59,7 @@ var cmdFilerCat = &Command{ func runFilerCat(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() if len(args) == 0 { return false diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index df5e002c5..0342aa585 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -83,7 +83,7 @@ var cmdFilerCopy = &Command{ func runCopy(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() if len(args) <= 1 { return false @@ -344,7 +344,12 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return err } - finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return uploaderErr + } + + finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( worker, &filer_pb.AssignVolumeRequest{ Count: 1, @@ -423,7 +428,13 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, <-concurrentChunks }() - fileId, uploadResult, err, _ := operation.UploadWithRetry( + uploader, err := operation.NewUploader() + if err != nil { + uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err) + return + } + + fileId, uploadResult, err, _ := uploader.UploadWithRetry( worker, &filer_pb.AssignVolumeRequest{ Count: 1, @@ -472,7 +483,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(func(_ context.Context) pb.ServerAddress { + operation.DeleteFileIds(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(copy.masters[0]) }, false, worker.options.grpcDialOption, fileIds) return uploadError @@ -535,8 +546,12 @@ func detectMimeType(f *os.File) string { } func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return nil, fmt.Errorf("upload data: %v", uploaderErr) + } - finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( worker, &filer_pb.AssignVolumeRequest{ Count: 1, diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index ff4a61e41..e8c4680ba 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/viper" "google.golang.org/grpc" "reflect" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -55,7 +56,7 @@ The backup writes to another filer store specified in a backup_filer.toml. func runFilerMetaBackup(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") // load backup_filer.toml @@ -63,9 +64,9 @@ func runFilerMetaBackup(cmd *Command, args []string) bool { v.SetConfigFile(*metaBackup.backupFilerConfig) if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file - glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+ + glog.Fatalf("Failed to load %s file: %v\nPlease use this command to generate the a %s.toml file\n"+ " weed scaffold -config=%s -output=.\n\n\n", - *metaBackup.backupFilerConfig, "backup_filer", "filer") + *metaBackup.backupFilerConfig, err, "backup_filer", "filer") } if err := metaBackup.initStore(v); err != nil { @@ -197,17 +198,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { metaBackup.clientEpoch++ + prefix := *metaBackup.filerDirectory + if !strings.HasSuffix(prefix, "/") { + prefix = prefix + "/" + } metadataFollowOption := &pb.MetadataFollowOption{ ClientName: "meta_backup", ClientId: metaBackup.clientId, ClientEpoch: metaBackup.clientEpoch, SelfSignature: 0, - PathPrefix: *metaBackup.filerDirectory, + PathPrefix: prefix, AdditionalPathPrefixes: nil, DirectoriesToWatch: nil, StartTsNs: startTime.UnixNano(), StopTsNs: 0, - EventErrorType: pb.TrivialOnError, + EventErrorType: pb.RetryForeverOnError, } return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 32855072b..d7a169535 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -45,7 +45,7 @@ var ( func runFilerMetaTail(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") clientId := util.RandomInt32() diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 61a5d26a2..78357cc04 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -78,7 +78,7 @@ var cmdFilerRemoteGateway = &Command{ func runFilerRemoteGateway(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") remoteGatewayOptions.grpcDialOption = grpcDialOption diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 9bb59dabb..f6fe9a99c 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -55,12 +55,12 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo ClientId: option.clientId, ClientEpoch: option.clientEpoch, SelfSignature: 0, - PathPrefix: option.bucketsDir, + PathPrefix: option.bucketsDir + "/", AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote}, DirectoriesToWatch: nil, StartTsNs: lastOffsetTs.UnixNano(), StopTsNs: 0, - EventErrorType: pb.TrivialOnError, + EventErrorType: pb.RetryForeverOnError, } return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 2d6133367..77dd95134 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -73,7 +73,7 @@ var cmdFilerRemoteSynchronize = &Command{ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") remoteSyncOptions.grpcDialOption = grpcDialOption diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 76f7e46d5..186523e45 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -64,17 +64,22 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour option.clientEpoch++ + prefix := mountedDir + if !strings.HasSuffix(prefix, "/") { + prefix = prefix + "/" + } + metadataFollowOption := &pb.MetadataFollowOption{ ClientName: "filer.remote.sync", ClientId: option.clientId, ClientEpoch: option.clientEpoch, SelfSignature: 0, - PathPrefix: mountedDir, + PathPrefix: prefix, AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote}, DirectoriesToWatch: nil, StartTsNs: lastOffsetTs.UnixNano(), StopTsNs: 0, - EventErrorType: pb.TrivialOnError, + EventErrorType: pb.RetryForeverOnError, } return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 4fca8158a..f53fdfb48 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -30,7 +30,7 @@ var cmdFilerReplicate = &Command{ func runFilerReplicate(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() util.LoadConfiguration("replication", true) util.LoadConfiguration("notification", true) config := util.GetViper() diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9ad76e31b..90204af4a 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -118,7 +118,7 @@ var cmdFilerSynchronize = &Command{ func runFilerSynchronize(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) @@ -296,12 +296,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) }) + prefix := sourcePath + if !strings.HasSuffix(prefix, "/") { + prefix = prefix + "/" + } + metadataFollowOption := &pb.MetadataFollowOption{ ClientName: clientName, ClientId: clientId, ClientEpoch: clientEpoch, SelfSignature: targetFilerSignature, - PathPrefix: sourcePath, + PathPrefix: prefix, AdditionalPathPrefixes: nil, DirectoriesToWatch: nil, StartTsNs: sourceFilerOffsetTsNs, @@ -469,14 +474,14 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str } } else { - // new key is outside of the watched directory + // new key is outside the watched directory if doDeleteFiles { key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) } } } else { - // old key is outside of the watched directory + // old key is outside the watched directory if strings.HasPrefix(string(sourceNewKey), sourcePath) { // new key is in the watched directory key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath) @@ -486,7 +491,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str return nil } } else { - // new key is also outside of the watched directory + // new key is also outside the watched directory // skip } } diff --git a/weed/command/iam.go b/weed/command/iam.go index 95964994f..fa21803dd 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -47,7 +47,7 @@ func runIam(cmd *Command, args []string) bool { func (iamopt *IamOptions) startIamServer() bool { filerAddress := pb.ServerAddress(*iamopt.filer) - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/command/master.go b/weed/command/master.go index f80d8faeb..914853d88 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -53,6 +53,7 @@ type MasterOptions struct { metricsIntervalSec *int raftResumeState *bool metricsHttpPort *int + metricsHttpIp *string heartbeatInterval *time.Duration electionTimeout *time.Duration raftHashicorp *bool @@ -77,6 +78,7 @@ func init() { m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>") m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds") m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + m.metricsHttpIp = cmdMaster.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") @@ -103,7 +105,7 @@ var ( func runMaster(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() util.LoadConfiguration("master", false) grace.SetupProfiling(*masterCpuProfile, *masterMemProfile) @@ -121,7 +123,15 @@ func runMaster(cmd *Command, args []string) bool { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } - go stats_collect.StartMetricsServer(*m.ipBind, *m.metricsHttpPort) + switch { + case *m.metricsHttpIp != "": + // noting to do, use m.metricsHttpIp + case *m.ipBind != "": + *m.metricsHttpIp = *m.ipBind + case *m.ip != "": + *m.metricsHttpIp = *m.ip + } + go stats_collect.StartMetricsServer(*m.metricsHttpIp, *m.metricsHttpPort) startMaster(m, masterWhiteList) return true @@ -180,10 +190,10 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } } ms.SetRaftServer(raftServer) - r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") - r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD") + r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods(http.MethodGet) + r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods(http.MethodGet, http.MethodHead) if *masterOption.raftHashicorp { - r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") + r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods(http.MethodGet) } // starting grpc server grpcPort := *masterOption.portGrpc diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index 7217aff0b..504ddb6c3 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -68,7 +68,7 @@ var cmdMasterFollower = &Command{ func runMasterFollower(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() util.LoadConfiguration("master", false) if *mf.portGrpc == 0 { diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 742c38180..a5325b11e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -66,7 +66,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { // try to connect to filer filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses() - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool var err error diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go index e093ebc56..5eb304204 100644 --- a/weed/command/mq_broker.go +++ b/weed/command/mq_broker.go @@ -54,7 +54,7 @@ var cmdMqBroker = &Command{ func runMqBroker(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap() diff --git a/weed/command/s3.go b/weed/command/s3.go index b7bb2a546..e568de91b 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -49,6 +49,7 @@ type S3Options struct { tlsCACertificate *string tlsVerifyClientCert *bool metricsHttpPort *int + metricsHttpIp *string allowEmptyFolder *bool allowDeleteBucketNotEmpty *bool auditLogConfig *string @@ -75,6 +76,7 @@ func init() { s3StandaloneOptions.tlsCACertificate = cmdS3.Flag.String("cacert.file", "", "path to the TLS CA certificate file") s3StandaloneOptions.tlsVerifyClientCert = cmdS3.Flag.Bool("tlsVerifyClientCert", false, "whether to verify the client's certificate") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + s3StandaloneOptions.metricsHttpIp = cmdS3.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders") s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path") @@ -163,17 +165,26 @@ var cmdS3 = &Command{ func runS3(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() - go stats_collect.StartMetricsServer(*s3StandaloneOptions.bindIp, *s3StandaloneOptions.metricsHttpPort) + switch { + case *s3StandaloneOptions.metricsHttpIp != "": + // noting to do, use s3StandaloneOptions.metricsHttpIp + case *s3StandaloneOptions.bindIp != "": + *s3StandaloneOptions.metricsHttpIp = *s3StandaloneOptions.bindIp + } + go stats_collect.StartMetricsServer(*s3StandaloneOptions.metricsHttpIp, *s3StandaloneOptions.metricsHttpPort) return s3StandaloneOptions.startS3Server() } // GetCertificateWithUpdate Auto refreshing TSL certificate -func (S3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) { - certs, err := S3opt.certProvider.KeyMaterial(context.Background()) +func (s3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) { + certs, err := s3opt.certProvider.KeyMaterial(context.Background()) + if certs == nil { + return nil, err + } return &certs.Certs[0], err } @@ -320,6 +331,10 @@ func (s3opt *S3Options) startS3Server() bool { ClientAuth: clientAuth, ClientCAs: caCertPool, } + err = security.FixTlsConfig(util.GetViper(), httpS.TLSConfig) + if err != nil { + glog.Fatalf("error with tls config: %v", err) + } if *s3opt.portHttps == 0 { glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port) if s3ApiLocalListener != nil { diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 574125207..728aecb53 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -285,7 +285,7 @@ password = "" ssl = false ssl_ca_file = "" ssl_cert_file = "" -ssl_key_file = " +ssl_key_file = "" insecure_skip_verify = false option_pool_size = 0 database = "seaweedfs" diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml index c5b2a563c..113e5b016 100644 --- a/weed/command/scaffold/security.toml +++ b/weed/command/scaffold/security.toml @@ -95,18 +95,29 @@ allowed_commonNames = "" # comma-separated SSL certificate common names cert = "" key = "" -# volume server https options -# Note: work in progress! -# this does not work with other clients, e.g., "weed filer|mount" etc, yet. +# https client for master|volume|filer|etc connection +# It is necessary that the parameters [https.volume]|[https.master]|[https.filer] are set [https.client] enabled = true +cert = "" +key = "" +ca = "" +# volume server https options [https.volume] cert = "" key = "" ca = "" +# master server https options [https.master] cert = "" key = "" ca = "" + +# filer server https options +[https.filer] +cert = "" +key = "" +ca = "" +# disable_tls_verify_client_cert = true|false (default: false) diff --git a/weed/command/server.go b/weed/command/server.go index 503927629..ddcaf1f7e 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -66,6 +66,7 @@ var ( volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly (deprecated, use minFreeSpace instead).") volumeMinFreeSpace = cmdServer.Flag.String("volume.minFreeSpace", "", "min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). Low disk space will mark all volumes as ReadOnly.") serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + serverMetricsHttpIp = cmdServer.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") isStartingMasterServer = cmdServer.Flag.Bool("master", true, "whether to start master server") @@ -97,7 +98,7 @@ func init() { masterOptions.metricsIntervalSec = cmdServer.Flag.Int("master.metrics.intervalSeconds", 15, "Prometheus push interval in seconds") masterOptions.raftResumeState = cmdServer.Flag.Bool("master.resumeState", false, "resume previous state on start master server") masterOptions.raftHashicorp = cmdServer.Flag.Bool("master.raftHashicorp", false, "use hashicorp raft") - masterOptions.raftBootstrap = cmdMaster.Flag.Bool("master.raftBootstrap", false, "Whether to bootstrap the Raft cluster") + masterOptions.raftBootstrap = cmdServer.Flag.Bool("master.raftBootstrap", false, "Whether to bootstrap the Raft cluster") masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers") @@ -119,7 +120,6 @@ func init() { filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second") filerOptions.diskType = cmdServer.Flag.String("filer.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") filerOptions.exposeDirectoryData = cmdServer.Flag.Bool("filer.exposeDirectoryData", true, "expose directory data via filer. If false, filer UI will be innaccessible.") - filerOptions.joinExistingFiler = cmdServer.Flag.Bool("filer.joinExistingFiler", false, "enable if new filer wants to join existing cluster") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") @@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool { go http.ListenAndServe(fmt.Sprintf(":%d", *serverOptions.debugPort), nil) } - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() util.LoadConfiguration("master", false) grace.SetupProfiling(*serverOptions.cpuprofile, *serverOptions.memprofile) @@ -207,6 +207,10 @@ func runServer(cmd *Command, args []string) bool { serverBindIp = serverIp } + if *serverMetricsHttpIp == "" { + *serverMetricsHttpIp = *serverBindIp + } + // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp @@ -245,7 +249,7 @@ func runServer(cmd *Command, args []string) bool { webdavOptions.filer = &filerAddress mqBrokerOptions.filerGroup = filerOptions.filerGroup - go stats_collect.StartMetricsServer(*serverBindIp, *serverMetricsHttpPort) + go stats_collect.StartMetricsServer(*serverMetricsHttpIp, *serverMetricsHttpPort) folders := strings.Split(*volumeDataFolders, ",") diff --git a/weed/command/shell.go b/weed/command/shell.go index f78ba89fc..1e921411b 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -35,7 +35,7 @@ var cmdShell = &Command{ func runShell(command *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") shellOptions.Directory = "/" diff --git a/weed/command/update.go b/weed/command/update.go index 314a903f2..bf871d654 100644 --- a/weed/command/update.go +++ b/weed/command/update.go @@ -20,6 +20,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "golang.org/x/net/context/ctxhttp" ) @@ -198,7 +199,7 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R if err != nil { return Release{}, err } - defer util.CloseResponse(res) + defer util_http.CloseResponse(res) if res.StatusCode != http.StatusOK { content := res.Header.Get("Content-Type") @@ -258,7 +259,7 @@ func getGithubData(ctx context.Context, url string) ([]byte, error) { if err != nil { return nil, err } - defer util.CloseResponse(res) + defer util_http.CloseResponse(res) if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status) @@ -308,7 +309,12 @@ func extractToFile(buf []byte, filename, target string) error { trd := tar.NewReader(gr) hdr, terr := trd.Next() if terr != nil { - glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr) + if hdr != nil { + glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr) + } else { + glog.Errorf("uncompress file is nil, failed:%s", terr) + } + return terr } rd = trd diff --git a/weed/command/upload.go b/weed/command/upload.go index 3e6b8f9a2..7135a707a 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -69,7 +69,7 @@ var cmdUpload = &Command{ func runUpload(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") defaultReplication, err := readMasterConfiguration(grpcDialOption, pb.ServerAddress(*upload.master)) diff --git a/weed/command/volume.go b/weed/command/volume.go index 852989d1f..1078d8d6c 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -65,6 +65,7 @@ type VolumeServerOptions struct { pprof *bool preStopSeconds *int metricsHttpPort *int + metricsHttpIp *string // pulseSeconds *int inflightUploadDataTimeout *time.Duration hasSlowRead *bool @@ -99,6 +100,7 @@ func init() { v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + v.metricsHttpIp = cmdVolume.Flag.String("metricsIp", "", "metrics listen ip. If empty, default to same as -ip.bind option.") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", true, "<experimental> if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") @@ -123,7 +125,7 @@ var ( func runVolume(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() // If --pprof is set we assume the caller wants to be able to collect // cpu and memory profiles via go tool pprof @@ -131,7 +133,15 @@ func runVolume(cmd *Command, args []string) bool { grace.SetupProfiling(*v.cpuProfile, *v.memProfile) } - go stats_collect.StartMetricsServer(*v.bindIp, *v.metricsHttpPort) + switch { + case *v.metricsHttpIp != "": + // noting to do, use v.metricsHttpIp + case *v.bindIp != "": + *v.metricsHttpIp = *v.bindIp + case *v.ip != "": + *v.metricsHttpIp = *v.ip + } + go stats_collect.StartMetricsServer(*v.metricsHttpIp, *v.metricsHttpPort) minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent) v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses() diff --git a/weed/command/webdav.go b/weed/command/webdav.go index f0e738f4a..1d1a43eda 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -60,7 +60,7 @@ var cmdWebDav = &Command{ func runWebDav(cmd *Command, args []string) bool { - util.LoadConfiguration("security", false) + util.LoadSecurityConfiguration() glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.Version(), *webDavStandaloneOptions.port) |
