diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/autocomplete.go | 6 | ||||
| -rw-r--r-- | weed/command/backup.go | 2 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 6 | ||||
| -rw-r--r-- | weed/command/filer.go | 1 | ||||
| -rw-r--r-- | weed/command/s3.go | 23 | ||||
| -rw-r--r-- | weed/command/scaffold/security.toml | 5 | ||||
| -rw-r--r-- | weed/command/server.go | 1 | ||||
| -rw-r--r-- | weed/pb/Makefile | 1 | ||||
| -rw-r--r-- | weed/pb/s3.proto | 25 | ||||
| -rw-r--r-- | weed/pb/s3_pb/s3.pb.go | 209 | ||||
| -rw-r--r-- | weed/pb/s3_pb/s3_grpc.pb.go | 101 | ||||
| -rw-r--r-- | weed/s3api/auth_credentials.go | 6 | ||||
| -rw-r--r-- | weed/s3api/auth_credentials_subscribe.go | 2 | ||||
| -rw-r--r-- | weed/s3api/http/header.go | 9 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_copy_handlers.go | 124 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_copy_handlers_test.go | 426 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_server_grpc.go | 16 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 11 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 2 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 |
21 files changed, 958 insertions, 22 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go index 9a545a183..955ce4006 100644 --- a/weed/command/autocomplete.go +++ b/weed/command/autocomplete.go @@ -41,7 +41,7 @@ func AutocompleteMain(commands []*Command) bool { func installAutoCompletion() bool { if runtime.GOOS == "windows" { - fmt.Println("windows is not supported") + fmt.Println("Windows is not supported") return false } @@ -56,7 +56,7 @@ func installAutoCompletion() bool { func uninstallAutoCompletion() bool { if runtime.GOOS == "windows" { - fmt.Println("windows is not supported") + fmt.Println("Windows is not supported") return false } @@ -65,7 +65,7 @@ func uninstallAutoCompletion() bool { fmt.Printf("uninstall failed! %s\n", err) return false } - fmt.Printf("autocompletion is disable. Please restart your shell.\n") + fmt.Printf("autocompletion is disabled. Please restart your shell.\n") return true } diff --git a/weed/command/backup.go b/weed/command/backup.go index ba1b0d287..c43b0d351 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -120,7 +120,7 @@ func runBackup(cmd *Command, args []string) bool { } if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) { - if err = v.Compact2(30*1024*1024*1024, 0, nil); err != nil { + if err = v.Compact2(0, 0, nil); err != nil { fmt.Printf("Compact Volume before synchronizing %v\n", err) return true } diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 82821f579..9f18cc5b9 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -74,14 +74,14 @@ func init() { var cmdBenchmark = &Command{ UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000", - Short: "benchmark on writing millions of files and read out", + Short: "benchmark by writing millions of files and reading them out", Long: `benchmark on an empty SeaweedFS file system. Two tests during benchmark: 1) write lots of small files to the system 2) read the files out - The file content is mostly zero, but no compression is done. + The file content is mostly zeros, but no compression is done. You can choose to only benchmark read or write. During write, the list of uploaded file ids is stored in "-list" specified file. @@ -468,7 +468,7 @@ func (s *stats) printStats() { timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000 fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency) fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken) - fmt.Printf("Complete requests: %d\n", completed) + fmt.Printf("Completed requests: %d\n", completed) fmt.Printf("Failed requests: %d\n", failed) fmt.Printf("Total transferred: %d bytes\n", transferred) fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken) diff --git a/weed/command/filer.go b/weed/command/filer.go index 0935feb76..42de11f08 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -86,6 +86,7 @@ func init() { // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port") + filerS3Options.portGrpc = cmdFiler.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file") filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file") diff --git a/weed/command/s3.go b/weed/command/s3.go index c28f3016e..42e447d90 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "google.golang.org/grpc/reflection" "net/http" "time" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/gorilla/mux" @@ -27,6 +29,7 @@ type S3Options struct { filer *string bindIp *string port *int + portGrpc *int config *string domainName *string tlsPrivateKey *string @@ -43,6 +46,7 @@ func init() { s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") + s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port") s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file") s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file") @@ -179,7 +183,7 @@ func (s3opt *S3Options) startS3Server() bool { router := mux.NewRouter().SkipClean(true) - _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ + s3ApiServer, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ Filer: filerAddress, Port: *s3opt.port, Config: *s3opt.config, @@ -196,6 +200,9 @@ func (s3opt *S3Options) startS3Server() bool { httpS := &http.Server{Handler: router} + if *s3opt.portGrpc == 0 { + *s3opt.portGrpc = 10000 + *s3opt.port + } if *s3opt.bindIp == "" { *s3opt.bindIp = "localhost" } @@ -213,6 +220,20 @@ func (s3opt *S3Options) startS3Server() bool { } } + // starting grpc server + grpcPort := *s3opt.portGrpc + grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*s3opt.bindIp, grpcPort, 0) + if err != nil { + glog.Fatalf("s3 failed to listen on grpc port %d: %v", grpcPort, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.s3")) + s3_pb.RegisterSeaweedS3Server(grpcS, s3ApiServer) + reflection.Register(grpcS) + if grpcLocalL != nil { + go grpcS.Serve(grpcLocalL) + } + go grpcS.Serve(grpcL) + if *s3opt.tlsPrivateKey != "" { glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port) if s3ApiLocalListner != nil { diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml index 38a803dd6..e5452cdff 100644 --- a/weed/command/scaffold/security.toml +++ b/weed/command/scaffold/security.toml @@ -67,6 +67,11 @@ cert = "" key = "" allowed_commonNames = "" # comma-separated SSL certificate common names +[grpc.s3] +cert = "" +key = "" +allowed_commonNames = "" # comma-separated SSL certificate common names + [grpc.msg_broker] cert = "" key = "" diff --git a/weed/command/server.go b/weed/command/server.go index d26376c1a..4b6b6c642 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -133,6 +133,7 @@ func init() { serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") + s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") diff --git a/weed/pb/Makefile b/weed/pb/Makefile index 954b4cb98..a8992bde2 100644 --- a/weed/pb/Makefile +++ b/weed/pb/Makefile @@ -9,6 +9,7 @@ gen: protoc remote.proto --go_out=./remote_pb --go-grpc_out=./remote_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative + protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc messaging.proto --go_out=./messaging_pb --go-grpc_out=./messaging_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative # protoc filer.proto --java_out=../../other/java/client/src/main/java cp filer.proto ../../other/java/client/src/main/proto diff --git a/weed/pb/s3.proto b/weed/pb/s3.proto new file mode 100644 index 000000000..4f129b817 --- /dev/null +++ b/weed/pb/s3.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package messaging_pb; + +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"; +option java_package = "seaweedfs.client"; +option java_outer_classname = "S3Proto"; + +////////////////////////////////////////////////// + +service SeaweedS3 { + + rpc Configure (S3ConfigureRequest) returns (S3ConfigureResponse) { + } + +} + +////////////////////////////////////////////////// + +message S3ConfigureRequest { + bytes s3_configuration_file_content = 1; +} + +message S3ConfigureResponse { +} diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go new file mode 100644 index 000000000..53f174f02 --- /dev/null +++ b/weed/pb/s3_pb/s3.pb.go @@ -0,0 +1,209 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.17.3 +// source: s3.proto + +package s3_pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type S3ConfigureRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + S3ConfigurationFileContent []byte `protobuf:"bytes,1,opt,name=s3_configuration_file_content,json=s3ConfigurationFileContent,proto3" json:"s3_configuration_file_content,omitempty"` +} + +func (x *S3ConfigureRequest) Reset() { + *x = S3ConfigureRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_s3_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *S3ConfigureRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*S3ConfigureRequest) ProtoMessage() {} + +func (x *S3ConfigureRequest) ProtoReflect() protoreflect.Message { + mi := &file_s3_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use S3ConfigureRequest.ProtoReflect.Descriptor instead. +func (*S3ConfigureRequest) Descriptor() ([]byte, []int) { + return file_s3_proto_rawDescGZIP(), []int{0} +} + +func (x *S3ConfigureRequest) GetS3ConfigurationFileContent() []byte { + if x != nil { + return x.S3ConfigurationFileContent + } + return nil +} + +type S3ConfigureResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *S3ConfigureResponse) Reset() { + *x = S3ConfigureResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_s3_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *S3ConfigureResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*S3ConfigureResponse) ProtoMessage() {} + +func (x *S3ConfigureResponse) ProtoReflect() protoreflect.Message { + mi := &file_s3_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use S3ConfigureResponse.ProtoReflect.Descriptor instead. +func (*S3ConfigureResponse) Descriptor() ([]byte, []int) { + return file_s3_proto_rawDescGZIP(), []int{1} +} + +var File_s3_proto protoreflect.FileDescriptor + +var file_s3_proto_rawDesc = []byte{ + 0x0a, 0x08, 0x73, 0x33, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x22, 0x57, 0x0a, 0x12, 0x53, 0x33, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x41, + 0x0a, 0x1d, 0x73, 0x33, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x1a, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x5f, 0x0a, 0x09, 0x53, 0x65, 0x61, 0x77, + 0x65, 0x65, 0x64, 0x53, 0x33, 0x12, 0x52, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x65, 0x12, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x49, 0x0a, 0x10, 0x73, 0x65, 0x61, + 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x07, 0x53, + 0x33, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, + 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x73, + 0x33, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_s3_proto_rawDescOnce sync.Once + file_s3_proto_rawDescData = file_s3_proto_rawDesc +) + +func file_s3_proto_rawDescGZIP() []byte { + file_s3_proto_rawDescOnce.Do(func() { + file_s3_proto_rawDescData = protoimpl.X.CompressGZIP(file_s3_proto_rawDescData) + }) + return file_s3_proto_rawDescData +} + +var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_s3_proto_goTypes = []interface{}{ + (*S3ConfigureRequest)(nil), // 0: messaging_pb.S3ConfigureRequest + (*S3ConfigureResponse)(nil), // 1: messaging_pb.S3ConfigureResponse +} +var file_s3_proto_depIdxs = []int32{ + 0, // 0: messaging_pb.SeaweedS3.Configure:input_type -> messaging_pb.S3ConfigureRequest + 1, // 1: messaging_pb.SeaweedS3.Configure:output_type -> messaging_pb.S3ConfigureResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_s3_proto_init() } +func file_s3_proto_init() { + if File_s3_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_s3_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*S3ConfigureRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_s3_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*S3ConfigureResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_s3_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_s3_proto_goTypes, + DependencyIndexes: file_s3_proto_depIdxs, + MessageInfos: file_s3_proto_msgTypes, + }.Build() + File_s3_proto = out.File + file_s3_proto_rawDesc = nil + file_s3_proto_goTypes = nil + file_s3_proto_depIdxs = nil +} diff --git a/weed/pb/s3_pb/s3_grpc.pb.go b/weed/pb/s3_pb/s3_grpc.pb.go new file mode 100644 index 000000000..1bc956be6 --- /dev/null +++ b/weed/pb/s3_pb/s3_grpc.pb.go @@ -0,0 +1,101 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package s3_pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SeaweedS3Client is the client API for SeaweedS3 service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SeaweedS3Client interface { + Configure(ctx context.Context, in *S3ConfigureRequest, opts ...grpc.CallOption) (*S3ConfigureResponse, error) +} + +type seaweedS3Client struct { + cc grpc.ClientConnInterface +} + +func NewSeaweedS3Client(cc grpc.ClientConnInterface) SeaweedS3Client { + return &seaweedS3Client{cc} +} + +func (c *seaweedS3Client) Configure(ctx context.Context, in *S3ConfigureRequest, opts ...grpc.CallOption) (*S3ConfigureResponse, error) { + out := new(S3ConfigureResponse) + err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedS3/Configure", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SeaweedS3Server is the server API for SeaweedS3 service. +// All implementations must embed UnimplementedSeaweedS3Server +// for forward compatibility +type SeaweedS3Server interface { + Configure(context.Context, *S3ConfigureRequest) (*S3ConfigureResponse, error) + mustEmbedUnimplementedSeaweedS3Server() +} + +// UnimplementedSeaweedS3Server must be embedded to have forward compatible implementations. +type UnimplementedSeaweedS3Server struct { +} + +func (UnimplementedSeaweedS3Server) Configure(context.Context, *S3ConfigureRequest) (*S3ConfigureResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Configure not implemented") +} +func (UnimplementedSeaweedS3Server) mustEmbedUnimplementedSeaweedS3Server() {} + +// UnsafeSeaweedS3Server may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SeaweedS3Server will +// result in compilation errors. +type UnsafeSeaweedS3Server interface { + mustEmbedUnimplementedSeaweedS3Server() +} + +func RegisterSeaweedS3Server(s grpc.ServiceRegistrar, srv SeaweedS3Server) { + s.RegisterService(&SeaweedS3_ServiceDesc, srv) +} + +func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(S3ConfigureRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedS3Server).Configure(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/messaging_pb.SeaweedS3/Configure", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedS3Server).Configure(ctx, req.(*S3ConfigureRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SeaweedS3_ServiceDesc is the grpc.ServiceDesc for SeaweedS3 service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SeaweedS3_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "messaging_pb.SeaweedS3", + HandlerType: (*SeaweedS3Server)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Configure", + Handler: _SeaweedS3_Configure_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "s3.proto", +} diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 6a7d83919..53a55617f 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -91,7 +91,7 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3A if err != nil { return fmt.Errorf("read S3 config: %v", err) } - return iam.loadS3ApiConfigurationFromBytes(content) + return iam.LoadS3ApiConfigurationFromBytes(content) } func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error { @@ -100,10 +100,10 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName str glog.Warningf("fail to read %s : %v", fileName, readErr) return fmt.Errorf("fail to read %s : %v", fileName, readErr) } - return iam.loadS3ApiConfigurationFromBytes(content) + return iam.LoadS3ApiConfigurationFromBytes(content) } -func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []byte) error { +func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromBytes(content []byte) error { s3ApiConfiguration := &iam_pb.S3ApiConfiguration{} if err := filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil { glog.Warningf("unmarshal error: %v", err) diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go index bd0b1016d..2cea739c6 100644 --- a/weed/s3api/auth_credentials_subscribe.go +++ b/weed/s3api/auth_credentials_subscribe.go @@ -23,7 +23,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la dir = message.NewParentPath } if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile { - if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil { + if err := s3a.iam.LoadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil { return err } glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile) diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go index d63d50443..30fc8eefa 100644 --- a/weed/s3api/http/header.go +++ b/weed/s3api/http/header.go @@ -28,11 +28,14 @@ const ( AmzStorageClass = "x-amz-storage-class" // S3 user-defined metadata - AmzUserMetaPrefix = "X-Amz-Meta-" + AmzUserMetaPrefix = "X-Amz-Meta-" + AmzUserMetaDirective = "X-Amz-Metadata-Directive" // S3 object tagging - AmzObjectTagging = "X-Amz-Tagging" - AmzTagCount = "x-amz-tagging-count" + AmzObjectTagging = "X-Amz-Tagging" + AmzObjectTaggingPrefix = "X-Amz-Tagging-" + AmzObjectTaggingDirective = "X-Amz-Tagging-Directive" + AmzTagCount = "x-amz-tagging-count" ) // Non-Standard S3 HTTP request constants diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go index f62db9c31..c44ca7ddf 100644 --- a/weed/s3api/s3api_object_copy_handlers.go +++ b/weed/s3api/s3api_object_copy_handlers.go @@ -3,9 +3,10 @@ package s3api import ( "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + headers "github.com/chrislusf/seaweedfs/weed/s3api/http" xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" - weed_server "github.com/chrislusf/seaweedfs/weed/server" + "modernc.org/strutil" "net/http" "net/url" "strconv" @@ -15,6 +16,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +const ( + DirectiveCopy = "COPY" + DirectiveReplace = "REPLACE" +) + func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { dstBucket, dstObject := xhttp.GetBucketAndObject(r) @@ -30,7 +36,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject) - if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && isReplace(r) { + replaceMeta, replaceTagging := replaceDirective(r.Header) + + if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) { fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)) dir, name := fullPath.DirAndName() entry, err := s3a.getEntry(dir, name) @@ -38,7 +46,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return } - entry.Extended = weed_server.SaveAmzMetaData(r, entry.Extended, isReplace(r)) + entry.Extended = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) err = s3a.touch(dir, name, entry) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) @@ -80,6 +88,11 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } defer util.CloseResponse(resp) + tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name) + if tagErr != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) + return + } glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl) etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body) @@ -182,6 +195,107 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req } -func isReplace(r *http.Request) bool { - return r.Header.Get("X-Amz-Metadata-Directive") == "REPLACE" +func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) { + return reqHeader.Get(headers.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(headers.AmzObjectTaggingDirective) == DirectiveReplace +} + +func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) { + if sc := reqHeader.Get(xhttp.AmzStorageClass); len(sc) == 0 { + if sc := existing[xhttp.AmzStorageClass]; len(sc) > 0 { + reqHeader[xhttp.AmzStorageClass] = sc + } + } + + if !replaceMeta { + for header, _ := range reqHeader { + if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) { + delete(reqHeader, header) + } + } + for k, v := range existing { + if strings.HasPrefix(k, xhttp.AmzUserMetaPrefix) { + reqHeader[k] = v + } + } + } + + if !replaceTagging { + for header, _ := range reqHeader { + if strings.HasPrefix(header, xhttp.AmzObjectTagging) { + delete(reqHeader, header) + } + } + + found := false + for k, _ := range existing { + if strings.HasPrefix(k, xhttp.AmzObjectTaggingPrefix) { + found = true + break + } + } + + if found { + tags, err := getTags(dir, name) + if err != nil { + return err + } + + var tagArr []string + for k, v := range tags { + tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v)) + } + tagStr := strutil.JoinFields(tagArr, "&") + reqHeader.Set(xhttp.AmzObjectTagging, tagStr) + } + } + return +} + +func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte) { + metadata = make(map[string][]byte) + + if sc := existing[xhttp.AmzStorageClass]; len(sc) > 0 { + metadata[xhttp.AmzStorageClass] = sc + } + if sc := reqHeader.Get(xhttp.AmzStorageClass); len(sc) > 0 { + metadata[xhttp.AmzStorageClass] = []byte(sc) + } + + if replaceMeta { + for header, values := range reqHeader { + if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) { + for _, value := range values { + metadata[header] = []byte(value) + } + } + } + } else { + for k, v := range existing { + if strings.HasPrefix(k, xhttp.AmzUserMetaPrefix) { + metadata[k] = v + } + } + } + + if replaceTagging { + if tags := reqHeader.Get(xhttp.AmzObjectTagging); tags != "" { + for _, v := range strings.Split(tags, "&") { + tag := strings.Split(v, "=") + if len(tag) == 2 { + metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + } else if len(tag) == 1 { + metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = nil + } + } + } + } else { + for k, v := range existing { + if strings.HasPrefix(k, xhttp.AmzObjectTagging) { + metadata[k] = v + } + } + delete(metadata, xhttp.AmzTagCount) + } + + return } diff --git a/weed/s3api/s3api_object_copy_handlers_test.go b/weed/s3api/s3api_object_copy_handlers_test.go new file mode 100644 index 000000000..d2c8e488b --- /dev/null +++ b/weed/s3api/s3api_object_copy_handlers_test.go @@ -0,0 +1,426 @@ +package s3api + +import ( + "fmt" + headers "github.com/chrislusf/seaweedfs/weed/s3api/http" + "net/http" + "reflect" + "sort" + "strings" + "testing" +) + +type H map[string]string + +func (h H) String() string { + pairs := make([]string, 0, len(h)) + for k, v := range h { + pairs = append(pairs, fmt.Sprintf("%s : %s", k, v)) + } + sort.Strings(pairs) + join := strings.Join(pairs, "\n") + return "\n" + join + "\n" +} + +var processMetadataTestCases = []struct { + caseId int + request H + existing H + getTags H + want H +}{ + { + 201, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging": "A=B&a=b&type=existing", + }, + }, + { + 202, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=existing", + headers.AmzUserMetaDirective: DirectiveReplace, + }, + }, + + { + 203, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 204, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 205, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{}, + H{}, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 206, + H{ + "User-Agent": "firefox", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, + + { + 207, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-Type": "existing", + }, + H{ + "A": "B", + "a": "b", + "type": "existing", + }, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + }, +} +var processMetadataBytesTestCases = []struct { + caseId int + request H + existing H + want H +}{ + { + 101, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + }, + + { + 102, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + }, + + { + 103, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "request", + }, + }, + + { + 104, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{ + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "request", + }, + }, + + { + 105, + H{ + "User-Agent": "firefox", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{ + "X-Amz-Meta-My-Meta": "existing", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "existing", + }, + H{}, + }, + + { + 107, + H{ + "User-Agent": "firefox", + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging": "A=B&a=b&type=request", + headers.AmzUserMetaDirective: DirectiveReplace, + headers.AmzObjectTaggingDirective: DirectiveReplace, + }, + H{}, + H{ + "X-Amz-Meta-My-Meta": "request", + "X-Amz-Tagging-A": "B", + "X-Amz-Tagging-a": "b", + "X-Amz-Tagging-type": "request", + }, + }, +} + +func TestProcessMetadata(t *testing.T) { + for _, tc := range processMetadataTestCases { + reqHeader := transferHToHeader(tc.request) + existing := transferHToHeader(tc.existing) + replaceMeta, replaceTagging := replaceDirective(reqHeader) + + err := processMetadata(reqHeader, existing, replaceMeta, replaceTagging, func(_ string, _ string) (tags map[string]string, err error) { + return tc.getTags, nil + }, "", "") + if err != nil { + t.Error(err) + } + + result := transferHeaderToH(reqHeader) + fmtTagging(result, tc.want) + + if !reflect.DeepEqual(result, tc.want) { + t.Error(fmt.Errorf("\n### CaseID: %d ###"+ + "\nRequest:%v"+ + "\nExisting:%v"+ + "\nGetTags:%v"+ + "\nWant:%v"+ + "\nActual:%v", + tc.caseId, tc.request, tc.existing, tc.getTags, tc.want, result)) + } + } +} + +func TestProcessMetadataBytes(t *testing.T) { + for _, tc := range processMetadataBytesTestCases { + reqHeader := transferHToHeader(tc.request) + existing := transferHToBytesArr(tc.existing) + replaceMeta, replaceTagging := replaceDirective(reqHeader) + extends := processMetadataBytes(reqHeader, existing, replaceMeta, replaceTagging) + + result := transferBytesArrToH(extends) + fmtTagging(result, tc.want) + + if !reflect.DeepEqual(result, tc.want) { + t.Error(fmt.Errorf("\n### CaseID: %d ###"+ + "\nRequest:%v"+ + "\nExisting:%v"+ + "\nWant:%v"+ + "\nActual:%v", + tc.caseId, tc.request, tc.existing, tc.want, result)) + } + } +} + +func fmtTagging(maps ...map[string]string) { + for _, m := range maps { + if tagging := m[headers.AmzObjectTagging]; len(tagging) > 0 { + split := strings.Split(tagging, "&") + sort.Strings(split) + m[headers.AmzObjectTagging] = strings.Join(split, "&") + } + } +} + +func transferHToHeader(data map[string]string) http.Header { + header := http.Header{} + for k, v := range data { + header.Add(k, v) + } + return header +} + +func transferHToBytesArr(data map[string]string) map[string][]byte { + m := make(map[string][]byte, len(data)) + for k, v := range data { + m[k] = []byte(v) + } + return m +} + +func transferBytesArrToH(data map[string][]byte) H { + m := make(map[string]string, len(data)) + for k, v := range data { + m[k] = string(v) + } + return m +} + +func transferHeaderToH(data map[string][]string) H { + m := make(map[string]string, len(data)) + for k, v := range data { + m[k] = v[len(v)-1] + } + return m +} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index b0b8e27e4..657fa8171 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -3,6 +3,7 @@ package s3api import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" "net" "net/http" "strings" @@ -31,6 +32,7 @@ type S3ApiServerOption struct { } type S3ApiServer struct { + s3_pb.UnimplementedSeaweedS3Server option *S3ApiServerOption iam *IdentityAccessManagement randomClientId int32 diff --git a/weed/s3api/s3api_server_grpc.go b/weed/s3api/s3api_server_grpc.go new file mode 100644 index 000000000..e93d0056f --- /dev/null +++ b/weed/s3api/s3api_server_grpc.go @@ -0,0 +1,16 @@ +package s3api + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" +) + +func (s3a *S3ApiServer) Configure(ctx context.Context, request *s3_pb.S3ConfigureRequest) (*s3_pb.S3ConfigureResponse, error) { + + if err := s3a.iam.LoadS3ApiConfigurationFromBytes(request.S3ConfigurationFileContent); err != nil { + return nil, err + } + + return &s3_pb.S3ConfigureResponse{}, nil + +} diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index c4bef5925..2afcd9cba 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -200,6 +200,17 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr } } + collectionIsMismatch := false + for _, volumeReplica := range replicas { + if volumeReplica.info.Collection != replica.info.Collection { + fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection) + collectionIsMismatch = true + } + } + if collectionIsMismatch { + continue + } + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) if !takeAction { diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 584ce722b..2b1daf97c 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -68,7 +68,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"") findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler") - applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer") + applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer. Currently this only works with default filerGroup.") c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") diff --git a/weed/util/constants.go b/weed/util/constants.go index 66bfca982..03d2b395e 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION_NUMBER = fmt.Sprintf("%.02f", 3.02) + VERSION_NUMBER = fmt.Sprintf("%.02f", 3.04) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" ) |
