aboutsummaryrefslogtreecommitdiff
path: root/weed/command/s3.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/s3.go')
-rw-r--r--weed/command/s3.go109
1 files changed, 80 insertions, 29 deletions
diff --git a/weed/command/s3.go b/weed/command/s3.go
index c8292a7d5..42e447d90 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -3,11 +3,14 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "google.golang.org/grpc/reflection"
"net/http"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/gorilla/mux"
@@ -23,26 +26,35 @@ var (
)
type S3Options struct {
- filer *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
- metricsHttpPort *int
- allowEmptyFolder *bool
+ filer *string
+ bindIp *string
+ port *int
+ portGrpc *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
+ allowEmptyFolder *bool
+ allowDeleteBucketNotEmpty *bool
+ auditLogConfig *string
+ localFilerSocket *string
}
func init() {
cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
+ s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
+ s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
+ s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file")
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
- s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders")
+ s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
+ s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
}
var cmdS3 = &Command{
@@ -137,11 +149,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
+ filerAddress := pb.ServerAddress(*s3opt.filer)
filerBucketsPath := "/buckets"
@@ -152,10 +160,10 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int
for {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
filerBucketsPath = resp.DirBuckets
metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
@@ -163,10 +171,10 @@ func (s3opt *S3Options) startS3Server() bool {
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
break
}
}
@@ -175,15 +183,16 @@ func (s3opt *S3Options) startS3Server() bool {
router := mux.NewRouter().SkipClean(true)
- _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
- Filer: *s3opt.filer,
- Port: *s3opt.port,
- FilerGrpcAddress: filerGrpcAddress,
- Config: *s3opt.config,
- DomainName: *s3opt.domainName,
- BucketsPath: filerBucketsPath,
- GrpcDialOption: grpcDialOption,
- AllowEmptyFolder: *s3opt.allowEmptyFolder,
+ s3ApiServer, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
+ Filer: filerAddress,
+ Port: *s3opt.port,
+ Config: *s3opt.config,
+ DomainName: *s3opt.domainName,
+ BucketsPath: filerBucketsPath,
+ GrpcDialOption: grpcDialOption,
+ AllowEmptyFolder: *s3opt.allowEmptyFolder,
+ AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
+ LocalFilerSocket: s3opt.localFilerSocket,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
@@ -191,19 +200,61 @@ func (s3opt *S3Options) startS3Server() bool {
httpS := &http.Server{Handler: router}
- listenAddress := fmt.Sprintf(":%d", *s3opt.port)
- s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
+ if *s3opt.portGrpc == 0 {
+ *s3opt.portGrpc = 10000 + *s3opt.port
+ }
+ if *s3opt.bindIp == "" {
+ *s3opt.bindIp = "localhost"
+ }
+
+ listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port)
+ s3ApiListener, s3ApiLocalListner, err := util.NewIpAndLocalListeners(*s3opt.bindIp, *s3opt.port, time.Duration(10)*time.Second)
if err != nil {
glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
}
+ if len(*s3opt.auditLogConfig) > 0 {
+ s3err.InitAuditLog(*s3opt.auditLogConfig)
+ if s3err.Logger != nil {
+ defer s3err.Logger.Close()
+ }
+ }
+
+ // starting grpc server
+ grpcPort := *s3opt.portGrpc
+ grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*s3opt.bindIp, grpcPort, 0)
+ if err != nil {
+ glog.Fatalf("s3 failed to listen on grpc port %d: %v", grpcPort, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.s3"))
+ s3_pb.RegisterSeaweedS3Server(grpcS, s3ApiServer)
+ reflection.Register(grpcS)
+ if grpcLocalL != nil {
+ go grpcS.Serve(grpcLocalL)
+ }
+ go grpcS.Serve(grpcL)
+
if *s3opt.tlsPrivateKey != "" {
glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
+ if s3ApiLocalListner != nil {
+ go func() {
+ if err = httpS.ServeTLS(s3ApiLocalListner, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
+ glog.Fatalf("S3 API Server Fail to serve: %v", err)
+ }
+ }()
+ }
if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
} else {
glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
+ if s3ApiLocalListner != nil {
+ go func() {
+ if err = httpS.Serve(s3ApiLocalListner); err != nil {
+ glog.Fatalf("S3 API Server Fail to serve: %v", err)
+ }
+ }()
+ }
if err = httpS.Serve(s3ApiListener); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}