aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml50
-rw-r--r--weed/command/filer_copy.go245
-rw-r--r--weed/command/shell.go2
-rw-r--r--weed/command/upload.go2
-rw-r--r--weed/operation/compress.go39
-rw-r--r--weed/operation/upload_content.go20
-rw-r--r--weed/replication/sink/filersink/fetch_write.go13
-rw-r--r--weed/replication/source/filer_source.go12
-rw-r--r--weed/s3api/s3api_handlers.go13
-rw-r--r--weed/server/master_grpc_server.go2
-rw-r--r--weed/server/master_server_handlers_admin.go4
-rw-r--r--weed/shell/command_fs_cd.go54
-rw-r--r--weed/shell/command_fs_du.go16
-rw-r--r--weed/shell/command_fs_ls.go38
-rw-r--r--weed/shell/command_fs_tree.go37
-rw-r--r--weed/shell/command_volume_list.go63
-rw-r--r--weed/shell/commands.go54
-rw-r--r--weed/storage/needle_parse_multipart.go8
-rw-r--r--weed/topology/node.go45
-rw-r--r--weed/topology/rack.go2
-rw-r--r--weed/topology/topology_test.go10
-rw-r--r--weed/topology/volume_growth.go8
-rw-r--r--weed/topology/volume_growth_test.go2
-rw-r--r--weed/util/file_util_non_posix.go12
-rw-r--r--weed/util/file_util_posix.go11
-rw-r--r--weed/util/grpc_client_server.go5
-rw-r--r--weed/wdclient/masterclient.go12
27 files changed, 478 insertions, 301 deletions
diff --git a/.travis.yml b/.travis.yml
index e1a1fa31c..612f643e9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,19 +1,21 @@
sudo: false
language: go
go:
-- 1.10.x
-- 1.11.x
-- 1.12.x
-- tip
+ - 1.10.x
+ - 1.11.x
+ - 1.12.x
+ # - tip
before_install:
-- export PATH=/home/travis/gopath/bin:$PATH
+ - export PATH=/home/travis/gopath/bin:$PATH
install:
-- go get ./weed/...
+ - export CGO_ENABLED="0"
+ - go env
+ - go get -u ./weed/...
script:
-- go test ./weed/...
+ - go test ./weed/...
before_deploy:
- make release
@@ -23,23 +25,23 @@ deploy:
api_key:
secure: ERL986+ncQ8lwAJUYDrQ8s2/FxF/cyNIwJIFCqspnWxQgGNNyokET9HapmlPSxjpFRF0q6L2WCg9OY3mSVRq4oI6hg1igOQ12KlLyN71XSJ3c8w0Ay5ho48TQ9l3f3Iu97mntBCe9l0R9pnT8wj1VI8YJxloXwUMG2yeTjA9aBI=
file:
- - build/linux_arm.tar.gz
- - build/linux_arm64.tar.gz
- - build/linux_386.tar.gz
- - build/linux_amd64.tar.gz
- - build/darwin_amd64.tar.gz
- - build/windows_386.zip
- - build/windows_amd64.zip
- - build/freebsd_arm.tar.gz
- - build/freebsd_amd64.tar.gz
- - build/freebsd_386.tar.gz
- - build/netbsd_arm.tar.gz
- - build/netbsd_amd64.tar.gz
- - build/netbsd_386.tar.gz
- - build/openbsd_arm.tar.gz
- - build/openbsd_amd64.tar.gz
- - build/openbsd_386.tar.gz
+ - build/linux_arm.tar.gz
+ - build/linux_arm64.tar.gz
+ - build/linux_386.tar.gz
+ - build/linux_amd64.tar.gz
+ - build/darwin_amd64.tar.gz
+ - build/windows_386.zip
+ - build/windows_amd64.zip
+ - build/freebsd_arm.tar.gz
+ - build/freebsd_amd64.tar.gz
+ - build/freebsd_386.tar.gz
+ - build/netbsd_arm.tar.gz
+ - build/netbsd_amd64.tar.gz
+ - build/netbsd_386.tar.gz
+ - build/openbsd_arm.tar.gz
+ - build/openbsd_amd64.tar.gz
+ - build/openbsd_386.tar.gz
on:
tags: true
repo: chrislusf/seaweedfs
- go: tip
+ go: 1.12.x
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index dd763974c..777e52ab6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -1,30 +1,31 @@
package command
import (
+ "context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/spf13/viper"
"google.golang.org/grpc"
+ "io"
"io/ioutil"
+ "net/http"
"net/url"
"os"
"path/filepath"
- "strings"
-
- "context"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "io"
- "net/http"
"strconv"
+ "strings"
+ "sync"
"time"
)
var (
- copy CopyOptions
+ copy CopyOptions
+ waitGroup sync.WaitGroup
)
type CopyOptions struct {
@@ -37,6 +38,7 @@ type CopyOptions struct {
maxMB *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
+ concurrency *int
}
func init() {
@@ -47,8 +49,9 @@ func init() {
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
+ copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
+ copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
}
var cmdCopy = &Command{
@@ -111,61 +114,135 @@ func runCopy(cmd *Command, args []string) bool {
go copy.masterClient.KeepConnectedToMaster()
copy.masterClient.WaitUntilConnected()
- for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(context.Background(), fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) {
- return false
+ if *cmdCopy.IsDebug {
+ util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
+ }
+
+ fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
+
+ ctx := context.Background()
+
+ go func() {
+ defer close(fileCopyTaskChan)
+ for _, fileOrDir := range fileOrDirs {
+ if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
+ fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
+ break
+ }
}
+ }()
+ for i := 0; i < *copy.concurrency; i++ {
+ waitGroup.Add(1)
+ go func() {
+ defer waitGroup.Done()
+ worker := FileCopyWorker{
+ options: &copy,
+ filerHost: filerUrl.Host,
+ filerGrpcAddress: filerGrpcAddress,
+ }
+ if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
+ fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
+ return
+ }
+ }()
}
+ waitGroup.Wait()
+
return true
}
-func doEachCopy(ctx context.Context, fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool {
- f, err := os.Open(fileOrDir)
- if err != nil {
- fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
- return false
- }
- defer f.Close()
+func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
- fi, err := f.Stat()
+ fi, err := os.Stat(fileOrDir)
if err != nil {
- fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
- return false
+ fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
+ return nil
}
mode := fi.Mode()
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
- if !doEachCopy(ctx, fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") {
- return false
+ if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
+ return err
}
}
- return true
+ return nil
+ }
+
+ uid, gid := util.GetFileUidGid(fi)
+
+ fileCopyTaskChan <- FileCopyTask{
+ sourceLocation: fileOrDir,
+ destinationUrlPath: destPath,
+ fileSize: fi.Size(),
+ fileMode: fi.Mode(),
+ uid: uid,
+ gid: gid,
+ }
+
+ return nil
+}
+
+type FileCopyWorker struct {
+ options *CopyOptions
+ filerHost string
+ filerGrpcAddress string
+}
+
+func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
+ for task := range fileCopyTaskChan {
+ if err := worker.doEachCopy(ctx, task); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type FileCopyTask struct {
+ sourceLocation string
+ destinationUrlPath string
+ fileSize int64
+ fileMode os.FileMode
+ uid uint32
+ gid uint32
+}
+
+func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
+
+ f, err := os.Open(task.sourceLocation)
+ if err != nil {
+ fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
+ if _, ok := err.(*os.PathError); ok {
+ fmt.Printf("skipping %s\n", task.sourceLocation)
+ return nil
+ }
+ return err
}
+ defer f.Close()
// this is a regular file
- if *copy.include != "" {
- if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
- return true
+ if *worker.options.include != "" {
+ if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
+ return nil
}
}
// find the chunk count
- chunkSize := int64(*copy.maxMB * 1024 * 1024)
+ chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
chunkCount := 1
- if chunkSize > 0 && fi.Size() > chunkSize {
- chunkCount = int(fi.Size()/chunkSize) + 1
+ if chunkSize > 0 && task.fileSize > chunkSize {
+ chunkCount = int(task.fileSize/chunkSize) + 1
}
if chunkCount == 1 {
- return uploadFileAsOne(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi)
+ return worker.uploadFileAsOne(ctx, task, f)
}
- return uploadFileInChunks(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize)
+ return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
}
-func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool {
+func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
// upload the file content
fileName := filepath.Base(f.Name())
@@ -173,29 +250,27 @@ func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string,
var chunks []*filer_pb.FileChunk
- if fi.Size() > 0 {
+ if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(copy.masterClient.GetMaster(), grpcDialOption, &operation.VolumeAssignRequest{
+ assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
- Replication: *copy.replication,
- Collection: *copy.collection,
- Ttl: *copy.ttl,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ Ttl: *worker.options.ttl,
})
if err != nil {
- fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
}
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth)
if err != nil {
- fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- return false
+ return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
- fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- return false
+ return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
@@ -207,25 +282,25 @@ func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string,
ETag: uploadResult.ETag,
})
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: urlFolder,
+ Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
- Gid: uint32(os.Getgid()),
- Uid: uint32(os.Getuid()),
- FileSize: uint64(fi.Size()),
- FileMode: uint32(fi.Mode()),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *copy.replication,
- Collection: *copy.collection,
- TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
},
Chunks: chunks,
},
@@ -236,14 +311,13 @@ func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string,
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
- return false
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- return true
+ return nil
}
-func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
@@ -253,14 +327,14 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri
for i := int64(0); i < int64(chunkCount); i++ {
// assign a volume
- assignResult, err := operation.Assign(copy.masterClient.GetMaster(), grpcDialOption, &operation.VolumeAssignRequest{
+ assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
- Replication: *copy.replication,
- Collection: *copy.collection,
- Ttl: *copy.ttl,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ Ttl: *worker.options.ttl,
})
if err != nil {
- fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err)
}
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
@@ -270,12 +344,10 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri
io.LimitReader(f, chunkSize),
false, "application/octet-stream", nil, assignResult.Auth)
if err != nil {
- fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- return false
+ return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
- fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- return false
+ return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
chunks = append(chunks, &filer_pb.FileChunk{
FileId: assignResult.Fid,
@@ -287,22 +359,22 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}
- if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: urlFolder,
+ Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
- Gid: uint32(os.Getgid()),
- Uid: uint32(os.Getuid()),
- FileSize: uint64(fi.Size()),
- FileMode: uint32(fi.Mode()),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *copy.replication,
- Collection: *copy.collection,
- TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
},
Chunks: chunks,
},
@@ -313,13 +385,12 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
- return false
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
- return true
+ return nil
}
func detectMimeType(f *os.File) string {
@@ -340,13 +411,9 @@ func detectMimeType(f *os.File) string {
func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(ctx, filerAddress, grpcDialOption)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
- }
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(clientConn)
+ return fn(client)
+ }, filerAddress, grpcDialOption)
- return fn(client)
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 3216d5d48..95b62f0b5 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -33,7 +33,7 @@ func runShell(command *Command, args []string) bool {
shellOptions.FilerHost = "localhost"
shellOptions.FilerPort = 8888
- shellOptions.Directory = ""
+ shellOptions.Directory = "/"
shell.RunShell(shellOptions)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 80fc635c1..1271725ba 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -37,7 +37,7 @@ func init() {
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
+ upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit")
}
var cmdUpload = &Command{
diff --git a/weed/operation/compress.go b/weed/operation/compress.go
index fedc877dd..7190eeeb2 100644
--- a/weed/operation/compress.go
+++ b/weed/operation/compress.go
@@ -16,50 +16,63 @@ import (
*/
func IsGzippable(ext, mtype string, data []byte) bool {
+ shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype)
+ if iAmSure {
+ return shouldBeZipped
+ }
+
+ isMostlyText := util.IsText(data)
+
+ return isMostlyText
+}
+
+/*
+* Default more not to gzip since gzip can be done on client side.
+ */
+func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) {
+
// text
if strings.HasPrefix(mtype, "text/") {
- return true
+ return true, true
}
// images
switch ext {
case ".svg", ".bmp":
- return true
+ return true, true
}
if strings.HasPrefix(mtype, "image/") {
- return false
+ return false, true
}
// by file name extension
switch ext {
case ".zip", ".rar", ".gz", ".bz2", ".xz":
- return false
+ return false, true
case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json":
- return true
+ return true, true
case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp":
- return true
+ return true, true
case ".png", ".jpg", ".jpeg":
- return false
+ return false, true
}
// by mime type
if strings.HasPrefix(mtype, "application/") {
if strings.HasSuffix(mtype, "xml") {
- return true
+ return true, true
}
if strings.HasSuffix(mtype, "script") {
- return true
+ return true, true
}
}
- isMostlyText := util.IsText(data)
-
- return isMostlyText
+ return false, false
}
func GzipData(input []byte) ([]byte, error) {
buf := new(bytes.Buffer)
- w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
+ w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
if _, err := w.Write(input); err != nil {
glog.V(2).Infoln("error compressing data:", err)
return nil, err
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 2276c67b7..dcab1a0ae 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,6 +2,8 @@ package operation
import (
"bytes"
+ "compress/flate"
+ "compress/gzip"
"encoding/json"
"errors"
"fmt"
@@ -39,10 +41,24 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
+ contentIsGzipped := isGzipped
+ shouldGzipNow := false
+ if !isGzipped {
+ if shouldBeZipped, iAmSure := IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
+ shouldGzipNow = true
+ contentIsGzipped = true
+ }
+ }
return upload_content(uploadUrl, func(w io.Writer) (err error) {
- _, err = io.Copy(w, reader)
+ if shouldGzipNow {
+ gzWriter, _ := gzip.NewWriterLevel(w, flate.BestSpeed)
+ _, err = io.Copy(gzWriter, reader)
+ gzWriter.Close()
+ } else {
+ _, err = io.Copy(w, reader)
+ }
return
- }, filename, isGzipped, mtype, pairMap, jwt)
+ }, filename, contentIsGzipped, mtype, pairMap, jwt)
}
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 0f3473ff2..d24770e3d 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -3,6 +3,7 @@ package filersink
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"strings"
"sync"
@@ -105,15 +106,11 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi
func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(ctx, fs.grpcAddress, fs.grpcDialOption)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
- }
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.grpcAddress, fs.grpcDialOption)
- return fn(client)
}
func volumeId(fileId string) string {
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 3ab6c7261..d7b5ebc4d 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -91,15 +91,11 @@ func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename stri
func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(ctx, fs.grpcAddress, grpcDialOption)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
- }
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.grpcAddress, fs.grpcDialOption)
- return fn(client)
}
func volumeId(fileId string) string {
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 5a63648ca..127be07e3 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
"net/http"
"net/url"
"time"
@@ -38,15 +39,11 @@ func encodeResponse(response interface{}) []byte {
func (s3a *S3ApiServer) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(ctx, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", s3a.option.FilerGrpcAddress, err)
- }
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
- return fn(client)
}
// If none of the http routes match respond with MethodNotAllowed
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 4c8ff5700..4ae2db030 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -63,7 +63,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
- int(heartbeat.MaxVolumeCount))
+ int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 95e55a497..4f0195084 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -68,8 +68,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
+ if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
} else {
count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo)
}
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
index 13208a3f8..f14350f02 100644
--- a/weed/shell/command_fs_cd.go
+++ b/weed/shell/command_fs_cd.go
@@ -2,11 +2,7 @@ package shell
import (
"context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
- "strings"
)
func init() {
@@ -35,59 +31,23 @@ func (c *commandFsCd) Help() string {
func (c *commandFsCd) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
- input := ""
- if len(args) > 0 {
- input = args[len(args)-1]
- }
+ input := findInputDirectory(args)
filerServer, filerPort, path, err := commandEnv.parseUrl(input)
if err != nil {
return err
}
- dir, name := filer2.FullPath(path).DirAndName()
- if strings.HasSuffix(path, "/") {
- if path == "/" {
- dir, name = "/", ""
- } else {
- dir, name = filer2.FullPath(path[0:len(path)-1]).DirAndName()
- }
+ if path == "/" {
+ commandEnv.option.FilerHost = filerServer
+ commandEnv.option.FilerPort = filerPort
+ commandEnv.option.Directory = "/"
+ return nil
}
ctx := context.Background()
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
-
- resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: dir,
- Prefix: name,
- StartFromFileName: name,
- InclusiveStartFrom: true,
- Limit: 1,
- })
- if listErr != nil {
- return listErr
- }
-
- if path == "" || path == "/" {
- return nil
- }
-
- if len(resp.Entries) == 0 {
- return fmt.Errorf("entry not found")
- }
-
- if resp.Entries[0].Name != name {
- println("path", path, "dir", dir, "name", name, "found", resp.Entries[0].Name)
- return fmt.Errorf("not a valid directory, found %s", resp.Entries[0].Name)
- }
-
- if !resp.Entries[0].IsDirectory {
- return fmt.Errorf("not a directory")
- }
-
- return nil
- })
+ err = commandEnv.checkDirectory(ctx, filerServer, filerPort, path)
if err == nil {
commandEnv.option.FilerHost = filerServer
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index 98e2eebd1..f305cabdc 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -8,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"io"
- "strings"
)
func init() {
@@ -33,21 +32,18 @@ func (c *commandFsDu) Help() string {
func (c *commandFsDu) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
- filerServer, filerPort, path, err := commandEnv.parseUrl(args[0])
+ filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- dir, name := filer2.FullPath(path).DirAndName()
- if strings.HasSuffix(path, "/") {
- if path == "/" {
- dir, name = "/", ""
- } else {
- dir, name = path[0 : len(path)-1], ""
- }
+ ctx := context.Background()
+
+ if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ path = path + "/"
}
- ctx := context.Background()
+ dir, name := filer2.FullPath(path).DirAndName()
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 7b8d1d0cc..93b86fa9f 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -36,41 +36,33 @@ func (c *commandFsLs) Do(args []string, commandEnv *commandEnv, writer io.Writer
var isLongFormat, showHidden bool
for _, arg := range args {
- switch arg {
- case "-a":
- showHidden = true
- case "-l":
- isLongFormat = true
+ if !strings.HasPrefix(arg, "-") {
+ break
}
- }
-
- input := ""
- if len(args) > 0 {
- input = args[len(args)-1]
- if strings.HasPrefix(input, "-") {
- input = ""
+ for _, t := range arg {
+ switch t {
+ case 'a':
+ showHidden = true
+ case 'l':
+ isLongFormat = true
+ }
}
}
+ input := findInputDirectory(args)
+
filerServer, filerPort, path, err := commandEnv.parseUrl(input)
if err != nil {
return err
}
- if input == "" && !strings.HasSuffix(path, "/") {
+
+ ctx := context.Background()
+
+ if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
path = path + "/"
}
dir, name := filer2.FullPath(path).DirAndName()
- // println("path", path, "dir", dir, "name", name)
- if strings.HasSuffix(path, "/") {
- if path == "/" {
- dir, name = "/", ""
- } else {
- dir, name = path[0 : len(path)-1], ""
- }
- }
-
- ctx := context.Background()
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index 805b17d2a..f1ffc9e4b 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -29,33 +29,33 @@ func (c *commandFsTree) Help() string {
func (c *commandFsTree) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
- filerServer, filerPort, path, err := parseFilerUrl(args[len(args)-1])
+ filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
dir, name := filer2.FullPath(path).DirAndName()
- if strings.HasSuffix(path, "/") {
- if path == "/" {
- dir, name = "/", ""
- } else {
- dir, name = path[0:len(path)-1], ""
- }
- }
ctx := context.Background()
return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
- return treeTraverseDirectory(ctx, writer, client, dir, name, 1000, newPrefix(), 0)
+ dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, dir, name, newPrefix(), -1)
+
+ if terr == nil {
+ fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount)
+ }
+
+ return terr
})
}
-func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int, prefix *Prefix, level int) (err error) {
+func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) {
paginatedCount := -1
startFromFileName := ""
+ paginateSize := 1000
for paginatedCount == -1 || paginatedCount == paginateSize {
resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
@@ -76,17 +76,29 @@ func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_p
}
for i, entry := range resp.Entries {
+
+ if level < 0 && name != "" {
+ if entry.Name != name {
+ break
+ }
+ }
+
// 0.1% wrong prefix here, but fixing it would need to paginate to the next batch first
isLast := paginatedCount < paginateSize && i == paginatedCount-1
fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name)
if entry.IsDirectory {
+ directoryCount++
subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
if dir == "/" {
subDir = "/" + entry.Name
}
- err = treeTraverseDirectory(ctx, writer, client, subDir, "", paginateSize, prefix, level+1)
+ dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, subDir, "", prefix, level+1)
+ directoryCount += dirCount
+ fileCount += fCount
+ err = terr
} else {
+ fileCount++
}
startFromFileName = entry.Name
@@ -114,6 +126,9 @@ func (p *Prefix) removeMarker(marker int) {
}
func (p *Prefix) getPrefix(level int, isLastChild bool) string {
var sb strings.Builder
+ if level < 0 {
+ return ""
+ }
for i := 0; i < level; i++ {
if _, ok := p.markers[i]; ok {
sb.WriteString("│")
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index f3f843d58..b3dca0d0b 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -42,30 +42,75 @@ func (c *commandVolumeList) Do(args []string, commandEnv *commandEnv, writer io.
return nil
}
-func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo) {
+func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo) statistics {
fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ var s statistics
for _, dc := range t.DataCenterInfos {
- writeDataCenterInfo(writer, dc)
+ s = s.plus(writeDataCenterInfo(writer, dc))
}
+ fmt.Fprintf(writer, "%+v \n", s)
+ return s
}
-func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) {
+func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics {
fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ var s statistics
for _, r := range t.RackInfos {
- writeRackInfo(writer, r)
+ s = s.plus(writeRackInfo(writer, r))
}
+ fmt.Fprintf(writer, " DataCenter %s %+v \n", t.Id, s)
+ return s
}
-func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) {
+func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics {
fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ var s statistics
for _, dn := range t.DataNodeInfos {
- writeDataNodeInfo(writer, dn)
+ s = s.plus(writeDataNodeInfo(writer, dn))
}
+ fmt.Fprintf(writer, " Rack %s %+v \n", t.Id, s)
+ return s
}
-func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) {
+func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics {
fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ var s statistics
for _, vi := range t.VolumeInfos {
- writeVolumeInformationMessage(writer, vi)
+ s = s.plus(writeVolumeInformationMessage(writer, vi))
}
+ fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s)
+ return s
}
-func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) {
+func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics {
fmt.Fprintf(writer, " volume %+v \n", t)
+ return newStatiscis(t)
+}
+
+type statistics struct {
+ Size uint64
+ FileCount uint64
+ DeletedFileCount uint64
+ DeletedBytes uint64
+}
+
+func newStatiscis(t *master_pb.VolumeInformationMessage) statistics {
+ return statistics{
+ Size: t.Size,
+ FileCount: t.FileCount,
+ DeletedFileCount: t.DeleteCount,
+ DeletedBytes: t.DeletedByteCount,
+ }
+}
+
+func (s statistics) plus(t statistics) statistics {
+ return statistics{
+ Size: s.Size + t.Size,
+ FileCount: s.FileCount + t.FileCount,
+ DeletedFileCount: s.DeletedFileCount + t.DeletedFileCount,
+ DeletedBytes: s.DeletedBytes + t.DeletedBytes,
+ }
+}
+
+func (s statistics) String() string {
+ if s.DeletedFileCount > 0 {
+ return fmt.Sprintf("total size:%d file_count:%d deleted_file:%d deleted_bytes:%d", s.Size, s.FileCount, s.DeletedFileCount, s.DeletedBytes)
+ }
+ return fmt.Sprintf("total size:%d file_count:%d", s.Size, s.FileCount)
}
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 2a262d913..50b70498d 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -1,7 +1,10 @@
package shell
import (
+ "context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"io"
@@ -46,6 +49,46 @@ func (ce *commandEnv) parseUrl(input string) (filerServer string, filerPort int6
return ce.option.FilerHost, ce.option.FilerPort, input, err
}
+func (ce *commandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
+
+ return ce.checkDirectory(ctx,filerServer,filerPort,path) == nil
+
+}
+
+func (ce *commandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
+
+ dir, name := filer2.FullPath(path).DirAndName()
+
+ return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
+ Directory: dir,
+ Prefix: name,
+ StartFromFileName: name,
+ InclusiveStartFrom: true,
+ Limit: 1,
+ })
+ if listErr != nil {
+ return listErr
+ }
+
+ if len(resp.Entries) == 0 {
+ return fmt.Errorf("entry not found")
+ }
+
+ if resp.Entries[0].Name != name {
+ return fmt.Errorf("not a valid directory, found %s", resp.Entries[0].Name)
+ }
+
+ if !resp.Entries[0].IsDirectory {
+ return fmt.Errorf("not a directory")
+ }
+
+ return nil
+ })
+
+}
+
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
if strings.HasPrefix(entryPath, "http") {
var u *url.URL
@@ -64,3 +107,14 @@ func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path
}
return
}
+
+func findInputDirectory(args []string) (input string) {
+ input = "."
+ if len(args) > 0 {
+ input = args[len(args)-1]
+ if strings.HasPrefix(input, "-") {
+ input = "."
+ }
+ }
+ return input
+}
diff --git a/weed/storage/needle_parse_multipart.go b/weed/storage/needle_parse_multipart.go
index e8d57ee38..3dba81fcf 100644
--- a/weed/storage/needle_parse_multipart.go
+++ b/weed/storage/needle_parse_multipart.go
@@ -88,10 +88,12 @@ func parseMultipart(r *http.Request) (
}
isGzipped = true
} else if operation.IsGzippable(ext, mtype, data) {
- if data, e = operation.GzipData(data); e != nil {
- return
+ if compressedData, err := operation.GzipData(data); err == nil {
+ if len(data) > len(compressedData) {
+ data = compressedData
+ isGzipped = true
+ }
}
- isGzipped = true
}
}
diff --git a/weed/topology/node.go b/weed/topology/node.go
index b7d2f79ec..db70c9734 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -5,6 +5,7 @@ import (
"math/rand"
"strings"
"sync"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -14,16 +15,16 @@ type NodeId string
type Node interface {
Id() NodeId
String() string
- FreeSpace() int
- ReserveOneVolume(r int) (*DataNode, error)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
- UpAdjustVolumeCountDelta(volumeCountDelta int)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
+ FreeSpace() int64
+ ReserveOneVolume(r int64) (*DataNode, error)
+ UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64)
+ UpAdjustVolumeCountDelta(volumeCountDelta int64)
+ UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64)
UpAdjustMaxVolumeId(vid storage.VolumeId)
- GetVolumeCount() int
- GetActiveVolumeCount() int
- GetMaxVolumeCount() int
+ GetVolumeCount() int64
+ GetActiveVolumeCount() int64
+ GetMaxVolumeCount() int64
GetMaxVolumeId() storage.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@@ -40,9 +41,9 @@ type Node interface {
}
type NodeImpl struct {
id NodeId
- volumeCount int
- activeVolumeCount int
- maxVolumeCount int
+ volumeCount int64
+ activeVolumeCount int64
+ maxVolumeCount int64
parent Node
sync.RWMutex // lock children
children map[NodeId]Node
@@ -126,7 +127,7 @@ func (n *NodeImpl) String() string {
func (n *NodeImpl) Id() NodeId {
return n.id
}
-func (n *NodeImpl) FreeSpace() int {
+func (n *NodeImpl) FreeSpace() int64 {
return n.maxVolumeCount - n.volumeCount
}
func (n *NodeImpl) SetParent(node Node) {
@@ -146,7 +147,7 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
+func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
@@ -171,20 +172,20 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
return nil, errors.New("No free volume slot found!")
}
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
- n.maxVolumeCount += maxVolumeCountDelta
+func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
- n.volumeCount += volumeCountDelta
+func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.volumeCount, volumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
- n.activeVolumeCount += activeVolumeCountDelta
+func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative
+ atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta)
if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
@@ -200,13 +201,13 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
return n.maxVolumeId
}
-func (n *NodeImpl) GetVolumeCount() int {
+func (n *NodeImpl) GetVolumeCount() int64 {
return n.volumeCount
}
-func (n *NodeImpl) GetActiveVolumeCount() int {
+func (n *NodeImpl) GetActiveVolumeCount() int64 {
return n.activeVolumeCount
}
-func (n *NodeImpl) GetMaxVolumeCount() int {
+func (n *NodeImpl) GetMaxVolumeCount() int64 {
return n.maxVolumeCount
}
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index f8f8ce34a..932c1a804 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -28,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
+func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 07dc9c67b..a8bdec902 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -47,8 +47,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
topo.SyncDataNodeRegistration(volumeMessages, dn)
- assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
- assert(t, "volumeCount", topo.volumeCount, volumeCount)
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
}
{
@@ -71,13 +71,13 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
}
topo.SyncDataNodeRegistration(volumeMessages, dn)
- assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
- assert(t, "volumeCount", topo.volumeCount, volumeCount)
+ assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
+ assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
}
topo.UnRegisterDataNode(dn)
- assert(t, "activeVolumeCount2", topo.activeVolumeCount, 0)
+ assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index ef39a1c01..514033ca1 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -105,7 +105,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
- if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
+ if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
@@ -134,7 +134,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
- if node.FreeSpace() < rp.SameRackCount+1 {
+ if node.FreeSpace() < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
@@ -175,7 +175,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
- r := rand.Intn(rack.FreeSpace())
+ r := rand.Int63n(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
@@ -183,7 +183,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
}
}
for _, datacenter := range otherDataCenters {
- r := rand.Intn(datacenter.FreeSpace())
+ r := rand.Int63n(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index f983df1ec..1963cb928 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -101,7 +101,7 @@ func setup(topologyLayout string) *Topology {
Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64)))
}
}
}
diff --git a/weed/util/file_util_non_posix.go b/weed/util/file_util_non_posix.go
new file mode 100644
index 000000000..ffcfef6d5
--- /dev/null
+++ b/weed/util/file_util_non_posix.go
@@ -0,0 +1,12 @@
+// +build linux darwin freebsd netbsd openbsd plan9 solaris zos
+
+package util
+
+import (
+ "os"
+ "syscall"
+)
+
+func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) {
+ return fi.Sys().(*syscall.Stat_t).Uid, fi.Sys().(*syscall.Stat_t).Gid
+}
diff --git a/weed/util/file_util_posix.go b/weed/util/file_util_posix.go
new file mode 100644
index 000000000..22ca60b3b
--- /dev/null
+++ b/weed/util/file_util_posix.go
@@ -0,0 +1,11 @@
+// +build windows
+
+package util
+
+import (
+ "os"
+)
+
+func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) {
+ return 0, 0
+}
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
index e5993aeab..5c08538dc 100644
--- a/weed/util/grpc_client_server.go
+++ b/weed/util/grpc_client_server.go
@@ -3,6 +3,7 @@ package util
import (
"context"
"fmt"
+ "net/http"
"strconv"
"strings"
"sync"
@@ -18,6 +19,10 @@ var (
grpcClientsLock sync.Mutex
)
+func init(){
+ http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 100
+}
+
func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
var options []grpc.ServerOption
options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 794471f7b..7a0bc9181 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -106,15 +106,11 @@ func withMasterClient(ctx context.Context, master string, grpcDialOption grpc.Di
return fmt.Errorf("failed to parse master grpc %v", master)
}
- grpcConnection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", master, err)
- }
- defer grpcConnection.Close()
-
- client := master_pb.NewSeaweedClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := master_pb.NewSeaweedClient(grpcConnection)
+ return fn(ctx, client)
+ }, masterGrpcAddress, grpcDialOption)
- return fn(ctx, client)
}
func (mc *MasterClient) WithClient(ctx context.Context, fn func(client master_pb.SeaweedClient) error) error {