diff options
Diffstat (limited to 'weed/command/s3.go')
| -rw-r--r-- | weed/command/s3.go | 109 |
1 files changed, 80 insertions, 29 deletions
diff --git a/weed/command/s3.go b/weed/command/s3.go index c8292a7d5..42e447d90 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -3,11 +3,14 @@ package command import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "google.golang.org/grpc/reflection" "net/http" "time" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/gorilla/mux" @@ -23,26 +26,35 @@ var ( ) type S3Options struct { - filer *string - port *int - config *string - domainName *string - tlsPrivateKey *string - tlsCertificate *string - metricsHttpPort *int - allowEmptyFolder *bool + filer *string + bindIp *string + port *int + portGrpc *int + config *string + domainName *string + tlsPrivateKey *string + tlsCertificate *string + metricsHttpPort *int + allowEmptyFolder *bool + allowDeleteBucketNotEmpty *bool + auditLogConfig *string + localFilerSocket *string } func init() { cmdS3.Run = runS3 // break init cycle s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") + s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") + s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port") s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file") + s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file") s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") - s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders") + s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders") + s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") } var cmdS3 = &Command{ @@ -137,11 +149,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*s3opt.filer) filerBucketsPath := "/buckets" @@ -152,10 +160,10 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } filerBucketsPath = resp.DirBuckets metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec) @@ -163,10 +171,10 @@ func (s3opt *S3Options) startS3Server() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) break } } @@ -175,15 +183,16 @@ func (s3opt *S3Options) startS3Server() bool { router := mux.NewRouter().SkipClean(true) - _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: *s3opt.filer, - Port: *s3opt.port, - FilerGrpcAddress: filerGrpcAddress, - Config: *s3opt.config, - DomainName: *s3opt.domainName, - BucketsPath: filerBucketsPath, - GrpcDialOption: grpcDialOption, - AllowEmptyFolder: *s3opt.allowEmptyFolder, + s3ApiServer, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ + Filer: filerAddress, + Port: *s3opt.port, + Config: *s3opt.config, + DomainName: *s3opt.domainName, + BucketsPath: filerBucketsPath, + GrpcDialOption: grpcDialOption, + AllowEmptyFolder: *s3opt.allowEmptyFolder, + AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, + LocalFilerSocket: s3opt.localFilerSocket, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) @@ -191,19 +200,61 @@ func (s3opt *S3Options) startS3Server() bool { httpS := &http.Server{Handler: router} - listenAddress := fmt.Sprintf(":%d", *s3opt.port) - s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) + if *s3opt.portGrpc == 0 { + *s3opt.portGrpc = 10000 + *s3opt.port + } + if *s3opt.bindIp == "" { + *s3opt.bindIp = "localhost" + } + + listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port) + s3ApiListener, s3ApiLocalListner, err := util.NewIpAndLocalListeners(*s3opt.bindIp, *s3opt.port, time.Duration(10)*time.Second) if err != nil { glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err) } + if len(*s3opt.auditLogConfig) > 0 { + s3err.InitAuditLog(*s3opt.auditLogConfig) + if s3err.Logger != nil { + defer s3err.Logger.Close() + } + } + + // starting grpc server + grpcPort := *s3opt.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*s3opt.bindIp, grpcPort, 0) + if err != nil { + glog.Fatalf("s3 failed to listen on grpc port %d: %v", grpcPort, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.s3")) + s3_pb.RegisterSeaweedS3Server(grpcS, s3ApiServer) + reflection.Register(grpcS) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } + go grpcS.Serve(grpcL) + if *s3opt.tlsPrivateKey != "" { glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port) + if s3ApiLocalListner != nil { + go func() { + if err = httpS.ServeTLS(s3ApiLocalListner, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil { + glog.Fatalf("S3 API Server Fail to serve: %v", err) + } + }() + } if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } } else { glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port) + if s3ApiLocalListner != nil { + go func() { + if err = httpS.Serve(s3ApiLocalListner); err != nil { + glog.Fatalf("S3 API Server Fail to serve: %v", err) + } + }() + } if err = httpS.Serve(s3ApiListener); err != nil { glog.Fatalf("S3 API Server Fail to serve: %v", err) } |
