aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-23 00:30:02 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-23 00:30:02 -0700
commitc07bcd5065391441cdc97c05975e447999bab4b1 (patch)
tree0b5a4dd8409b52032467b91466ce66116d9a34a4
parent654a69ff52a0625db174d7851463e3cc464ffe5a (diff)
downloadseaweedfs-c07bcd5065391441cdc97c05975e447999bab4b1.tar.xz
seaweedfs-c07bcd5065391441cdc97c05975e447999bab4b1.zip
refactoring
-rw-r--r--weed/pb/filer_pb/filer_client.go95
-rw-r--r--weed/s3api/filer_multipart.go4
-rw-r--r--weed/s3api/filer_util.go98
-rw-r--r--weed/s3api/s3api_bucket_handlers.go6
4 files changed, 97 insertions, 106 deletions
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 100e997b2..1a92b452d 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -5,11 +5,18 @@ import (
"fmt"
"io"
"math"
+ "os"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
+var (
+ OS_UID = uint32(os.Getuid())
+ OS_GID = uint32(os.Getgid())
+)
+
type FilerClient interface {
WithFilerClient(fn func(SeaweedFilerClient) error) error
AdjustedUrl(hostAndPort string) string
@@ -50,15 +57,26 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) {
- err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+ return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
+
+}
+
+func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
+
+ return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
+
+}
- lastEntryName := ""
+func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
+
+ err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
request := &ListEntriesRequest{
- Directory: string(fullDirPath),
- Prefix: prefix,
- StartFromFileName: lastEntryName,
- Limit: math.MaxUint32,
+ Directory: string(fullDirPath),
+ Prefix: prefix,
+ StartFromFileName: startFrom,
+ Limit: limit,
+ InclusiveStartFrom: inclusive,
}
glog.V(3).Infof("read directory: %v", request)
@@ -120,3 +138,68 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin
return
}
+
+func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
+ return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+
+ entry := &Entry{
+ Name: dirName,
+ IsDirectory: true,
+ Attributes: &FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0777 | os.ModeDir),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+
+ if fn != nil {
+ fn(entry)
+ }
+
+ request := &CreateEntryRequest{
+ Directory: parentDirectoryPath,
+ Entry: entry,
+ }
+
+ glog.V(1).Infof("mkdir: %v", request)
+ if err := CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("mkdir %v: %v", request, err)
+ return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
+ }
+
+ return nil
+ })
+}
+
+func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
+ return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
+
+ entry := &Entry{
+ Name: fileName,
+ IsDirectory: false,
+ Attributes: &FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0770),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ Chunks: chunks,
+ }
+
+ request := &CreateEntryRequest{
+ Directory: parentDirectoryPath,
+ Entry: entry,
+ }
+
+ glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
+ if err := CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("create file %v:%v", request, err)
+ return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
+ }
+
+ return nil
+ })
+}
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 1350fb18e..e81461dd2 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -155,7 +155,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
},
}
- entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
+ entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, uint32(*input.MaxUploads))
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
@@ -190,7 +190,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
},
}
- entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, int(*input.MaxParts))
+ entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, ErrNoSuchUpload
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 2e738af50..51249cf39 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -3,115 +3,29 @@ package s3api
import (
"context"
"fmt"
- "io"
- "os"
"strings"
- "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- entry := &filer_pb.Entry{
- Name: dirName,
- IsDirectory: true,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(0777 | os.ModeDir),
- Uid: OS_UID,
- Gid: OS_GID,
- },
- }
-
- if fn != nil {
- fn(entry)
- }
-
- request := &filer_pb.CreateEntryRequest{
- Directory: parentDirectoryPath,
- Entry: entry,
- }
- glog.V(1).Infof("mkdir: %v", request)
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("mkdir %v: %v", request, err)
- return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
- }
+ return filer_pb.Mkdir(s3a, parentDirectoryPath, dirName, fn)
- return nil
- })
}
func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- entry := &filer_pb.Entry{
- Name: fileName,
- IsDirectory: false,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(0770),
- Uid: OS_UID,
- Gid: OS_GID,
- },
- Chunks: chunks,
- }
-
- request := &filer_pb.CreateEntryRequest{
- Directory: parentDirectoryPath,
- Entry: entry,
- }
- glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("create file %v:%v", request, err)
- return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
- }
+ return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks)
- return nil
- })
}
-func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) {
-
- err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) {
- request := &filer_pb.ListEntriesRequest{
- Directory: parentDirectoryPath,
- Prefix: prefix,
- StartFromFileName: startFrom,
- InclusiveStartFrom: inclusive,
- Limit: uint32(limit),
- }
-
- glog.V(4).Infof("read directory: %v", request)
- stream, err := client.ListEntries(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("read directory %v: %v", request, err)
- return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err)
- }
-
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- break
- } else {
- return recvErr
- }
- }
-
- entries = append(entries, resp.Entry)
-
- }
-
- return nil
- })
+ err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) {
+ entries = append(entries, entry)
+ }, startFrom, inclusive, limit)
return
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 02a01e74f..f1bfb2156 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -6,7 +6,6 @@ import (
"fmt"
"math"
"net/http"
- "os"
"time"
"github.com/aws/aws-sdk-go/aws"
@@ -17,11 +16,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
-var (
- OS_UID = uint32(os.Getuid())
- OS_GID = uint32(os.Getgid())
-)
-
type ListAllMyBucketsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"`
Owner *s3.Owner