diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/filer.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/filer.go')
| -rw-r--r-- | weed/command/filer.go | 103 |
1 files changed, 77 insertions, 26 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go index ddee0852c..c9f9a1956 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -2,10 +2,10 @@ package command import ( "fmt" + "net" "net/http" "os" - "strconv" - "strings" + "runtime" "time" "google.golang.org/grpc/reflection" @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -30,11 +30,14 @@ var ( ) type FilerOptions struct { - masters *string + masters map[string]pb.ServerAddress + mastersString *string ip *string bindIp *string port *int + portGrpc *int publicPort *int + filerGroup *string collection *string defaultReplicaPlacement *string disableDirListing *bool @@ -45,22 +48,25 @@ type FilerOptions struct { enableNotification *bool disableHttp *bool cipher *bool - peers *string metricsHttpPort *int saveToFilerLimit *int defaultLevelDbDirectory *string concurrentUploadLimitMB *int debug *bool debugPort *int + localSocket *string + showUIDirectoryDelete *bool } func init() { cmdFiler.Run = runFiler // break init cycle - f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") - f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to") + f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") + f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port") f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") @@ -70,22 +76,26 @@ func init() { f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack") f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed") f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") - f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") + f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") + f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port") + filerS3Options.portGrpc = cmdFiler.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file") filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file") filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file") - filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") + filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") + filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") + filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") // start webdav on filer filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") @@ -96,10 +106,11 @@ func init() { filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file") filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file") filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") - filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB") + filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") // start iam on filer filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service") + filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address") filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port") } @@ -134,10 +145,12 @@ func runFiler(cmd *Command, args []string) bool { go stats_collect.StartMetricsServer(*f.metricsHttpPort) - filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port) + filerAddress := util.JoinHostPort(*f.ip, *f.port) startDelay := time.Duration(2) if *filerStartS3 { filerS3Options.filer = &filerAddress + filerS3Options.bindIp = f.bindIp + filerS3Options.localFilerSocket = f.localSocket go func() { time.Sleep(startDelay * time.Second) filerS3Options.startS3Server() @@ -156,13 +169,15 @@ func runFiler(cmd *Command, args []string) bool { if *filerStartIam { filerIamOptions.filer = &filerAddress - filerIamOptions.masters = f.masters + filerIamOptions.masters = f.mastersString go func() { time.Sleep(startDelay * time.Second) filerIamOptions.startIamServer() }() } + f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap() + f.startFiler() return true @@ -176,16 +191,20 @@ func (fo *FilerOptions) startFiler() { if *fo.publicPort != 0 { publicVolumeMux = http.NewServeMux() } + if *fo.portGrpc == 0 { + *fo.portGrpc = 10000 + *fo.port + } + if *fo.bindIp == "" { + *fo.bindIp = *fo.ip + } defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") - var peers []string - if *fo.peers != "" { - peers = strings.Split(*fo.peers, ",") - } + filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: strings.Split(*fo.masters, ","), + Masters: fo.masters, + FilerGroup: *fo.filerGroup, Collection: *fo.collection, DefaultReplication: *fo.defaultReplicaPlacement, DisableDirListing: *fo.disableDirListing, @@ -195,21 +214,20 @@ func (fo *FilerOptions) startFiler() { Rack: *fo.rack, DefaultLevelDbDir: defaultLevelDbDirectory, DisableHttp: *fo.disableHttp, - Host: *fo.ip, - Port: uint32(*fo.port), + Host: filerAddress, Cipher: *fo.cipher, SaveToFilerLimit: int64(*fo.saveToFilerLimit), - Filers: peers, ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, + ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) } if *fo.publicPort != 0 { - publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort) + publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort) glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress) - publicListener, e := util.NewListener(publicListeningAddress, 0) + publicListener, localPublicListner, e := util.NewIpAndLocalListeners(*fo.bindIp, *fo.publicPort, 0) if e != nil { glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e) } @@ -218,11 +236,18 @@ func (fo *FilerOptions) startFiler() { glog.Fatalf("Volume server fail to serve public: %v", e) } }() + if localPublicListner != nil { + go func() { + if e := http.Serve(localPublicListner, publicVolumeMux); e != nil { + glog.Errorf("Volume server fail to serve public: %v", e) + } + }() + } } glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port) - filerListener, e := util.NewListener( - *fo.bindIp+":"+strconv.Itoa(*fo.port), + filerListener, filerLocalListener, e := util.NewIpAndLocalListeners( + *fo.bindIp, *fo.port, time.Duration(10)*time.Second, ) if e != nil { @@ -230,17 +255,43 @@ func (fo *FilerOptions) startFiler() { } // starting grpc server - grpcPort := *fo.port + 10000 - grpcL, err := util.NewListener(*fo.bindIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *fo.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } go grpcS.Serve(grpcL) httpS := &http.Server{Handler: defaultMux} + if runtime.GOOS != "windows" { + if *fo.localSocket == "" { + *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port) + } + if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error()) + } + go func() { + // start on local unix socket + filerSocketListener, err := net.Listen("unix", *fo.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err) + } + httpS.Serve(filerSocketListener) + }() + } + if filerLocalListener != nil { + go func() { + if err := httpS.Serve(filerLocalListener); err != nil { + glog.Errorf("Filer Fail to serve: %v", e) + } + }() + } if err := httpS.Serve(filerListener); err != nil { glog.Fatalf("Filer Fail to serve: %v", e) } |
