diff options
27 files changed, 478 insertions, 301 deletions
diff --git a/.travis.yml b/.travis.yml index e1a1fa31c..612f643e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,19 +1,21 @@ sudo: false language: go go: -- 1.10.x -- 1.11.x -- 1.12.x -- tip + - 1.10.x + - 1.11.x + - 1.12.x + # - tip before_install: -- export PATH=/home/travis/gopath/bin:$PATH + - export PATH=/home/travis/gopath/bin:$PATH install: -- go get ./weed/... + - export CGO_ENABLED="0" + - go env + - go get -u ./weed/... script: -- go test ./weed/... + - go test ./weed/... before_deploy: - make release @@ -23,23 +25,23 @@ deploy: api_key: secure: ERL986+ncQ8lwAJUYDrQ8s2/FxF/cyNIwJIFCqspnWxQgGNNyokET9HapmlPSxjpFRF0q6L2WCg9OY3mSVRq4oI6hg1igOQ12KlLyN71XSJ3c8w0Ay5ho48TQ9l3f3Iu97mntBCe9l0R9pnT8wj1VI8YJxloXwUMG2yeTjA9aBI= file: - - build/linux_arm.tar.gz - - build/linux_arm64.tar.gz - - build/linux_386.tar.gz - - build/linux_amd64.tar.gz - - build/darwin_amd64.tar.gz - - build/windows_386.zip - - build/windows_amd64.zip - - build/freebsd_arm.tar.gz - - build/freebsd_amd64.tar.gz - - build/freebsd_386.tar.gz - - build/netbsd_arm.tar.gz - - build/netbsd_amd64.tar.gz - - build/netbsd_386.tar.gz - - build/openbsd_arm.tar.gz - - build/openbsd_amd64.tar.gz - - build/openbsd_386.tar.gz + - build/linux_arm.tar.gz + - build/linux_arm64.tar.gz + - build/linux_386.tar.gz + - build/linux_amd64.tar.gz + - build/darwin_amd64.tar.gz + - build/windows_386.zip + - build/windows_amd64.zip + - build/freebsd_arm.tar.gz + - build/freebsd_amd64.tar.gz + - build/freebsd_386.tar.gz + - build/netbsd_arm.tar.gz + - build/netbsd_amd64.tar.gz + - build/netbsd_386.tar.gz + - build/openbsd_arm.tar.gz + - build/openbsd_amd64.tar.gz + - build/openbsd_386.tar.gz on: tags: true repo: chrislusf/seaweedfs - go: tip + go: 1.12.x diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index dd763974c..777e52ab6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -1,30 +1,31 @@ package command import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/spf13/viper" "google.golang.org/grpc" + "io" "io/ioutil" + "net/http" "net/url" "os" "path/filepath" - "strings" - - "context" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "io" - "net/http" "strconv" + "strings" + "sync" "time" ) var ( - copy CopyOptions + copy CopyOptions + waitGroup sync.WaitGroup ) type CopyOptions struct { @@ -37,6 +38,7 @@ type CopyOptions struct { maxMB *int grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient + concurrency *int } func init() { @@ -47,8 +49,9 @@ func init() { copy.replication = cmdCopy.Flag.String("replication", "", "replication type") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit") + copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000") + copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") } var cmdCopy = &Command{ @@ -111,61 +114,135 @@ func runCopy(cmd *Command, args []string) bool { go copy.masterClient.KeepConnectedToMaster() copy.masterClient.WaitUntilConnected() - for _, fileOrDir := range fileOrDirs { - if !doEachCopy(context.Background(), fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) { - return false + if *cmdCopy.IsDebug { + util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") + } + + fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency) + + ctx := context.Background() + + go func() { + defer close(fileCopyTaskChan) + for _, fileOrDir := range fileOrDirs { + if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err) + break + } } + }() + for i := 0; i < *copy.concurrency; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + worker := FileCopyWorker{ + options: ©, + filerHost: filerUrl.Host, + filerGrpcAddress: filerGrpcAddress, + } + if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) + return + } + }() } + waitGroup.Wait() + return true } -func doEachCopy(ctx context.Context, fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool { - f, err := os.Open(fileOrDir) - if err != nil { - fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) - return false - } - defer f.Close() +func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error { - fi, err := f.Stat() + fi, err := os.Stat(fileOrDir) if err != nil { - fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err) - return false + fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err) + return nil } mode := fi.Mode() if mode.IsDir() { files, _ := ioutil.ReadDir(fileOrDir) for _, subFileOrDir := range files { - if !doEachCopy(ctx, fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") { - return false + if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil { + return err } } - return true + return nil + } + + uid, gid := util.GetFileUidGid(fi) + + fileCopyTaskChan <- FileCopyTask{ + sourceLocation: fileOrDir, + destinationUrlPath: destPath, + fileSize: fi.Size(), + fileMode: fi.Mode(), + uid: uid, + gid: gid, + } + + return nil +} + +type FileCopyWorker struct { + options *CopyOptions + filerHost string + filerGrpcAddress string +} + +func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error { + for task := range fileCopyTaskChan { + if err := worker.doEachCopy(ctx, task); err != nil { + return err + } + } + return nil +} + +type FileCopyTask struct { + sourceLocation string + destinationUrlPath string + fileSize int64 + fileMode os.FileMode + uid uint32 + gid uint32 +} + +func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error { + + f, err := os.Open(task.sourceLocation) + if err != nil { + fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err) + if _, ok := err.(*os.PathError); ok { + fmt.Printf("skipping %s\n", task.sourceLocation) + return nil + } + return err } + defer f.Close() // this is a regular file - if *copy.include != "" { - if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok { - return true + if *worker.options.include != "" { + if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok { + return nil } } // find the chunk count - chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkSize := int64(*worker.options.maxMB * 1024 * 1024) chunkCount := 1 - if chunkSize > 0 && fi.Size() > chunkSize { - chunkCount = int(fi.Size()/chunkSize) + 1 + if chunkSize > 0 && task.fileSize > chunkSize { + chunkCount = int(task.fileSize/chunkSize) + 1 } if chunkCount == 1 { - return uploadFileAsOne(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi) + return worker.uploadFileAsOne(ctx, task, f) } - return uploadFileInChunks(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize) + return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize) } -func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool { +func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error { // upload the file content fileName := filepath.Base(f.Name()) @@ -173,29 +250,27 @@ func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, var chunks []*filer_pb.FileChunk - if fi.Size() > 0 { + if task.fileSize > 0 { // assign a volume - assignResult, err := operation.Assign(copy.masterClient.GetMaster(), grpcDialOption, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err) } targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) @@ -207,25 +282,25 @@ func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, ETag: uploadResult.ETag, }) - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, }, @@ -236,14 +311,13 @@ func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - return true + return nil } -func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { +func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) @@ -253,14 +327,14 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri for i := int64(0); i < int64(chunkCount); i++ { // assign a volume - assignResult, err := operation.Assign(copy.masterClient.GetMaster(), grpcDialOption, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + fmt.Printf("Failed to assign from %s: %v\n", *worker.options.master, err) } targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid @@ -270,12 +344,10 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri io.LimitReader(f, chunkSize), false, "application/octet-stream", nil, assignResult.Auth) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } chunks = append(chunks, &filer_pb.FileChunk{ FileId: assignResult.Fid, @@ -287,22 +359,22 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) } - if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, }, @@ -313,13 +385,12 @@ func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress stri } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) - return true + return nil } func detectMimeType(f *os.File) string { @@ -340,13 +411,9 @@ func detectMimeType(f *os.File) string { func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(ctx, filerAddress, grpcDialOption) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", filerAddress, err) - } - defer grpcConnection.Close() - - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(clientConn) + return fn(client) + }, filerAddress, grpcDialOption) - return fn(client) } diff --git a/weed/command/shell.go b/weed/command/shell.go index 3216d5d48..95b62f0b5 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -33,7 +33,7 @@ func runShell(command *Command, args []string) bool { shellOptions.FilerHost = "localhost" shellOptions.FilerPort = 8888 - shellOptions.Directory = "" + shellOptions.Directory = "/" shell.RunShell(shellOptions) diff --git a/weed/command/upload.go b/weed/command/upload.go index 80fc635c1..1271725ba 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -37,7 +37,7 @@ func init() { upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name") upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name") upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") + upload.maxMB = cmdUpload.Flag.Int("maxMB", 32, "split files larger than the limit") } var cmdUpload = &Command{ diff --git a/weed/operation/compress.go b/weed/operation/compress.go index fedc877dd..7190eeeb2 100644 --- a/weed/operation/compress.go +++ b/weed/operation/compress.go @@ -16,50 +16,63 @@ import ( */ func IsGzippable(ext, mtype string, data []byte) bool { + shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype) + if iAmSure { + return shouldBeZipped + } + + isMostlyText := util.IsText(data) + + return isMostlyText +} + +/* +* Default more not to gzip since gzip can be done on client side. + */ +func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) { + // text if strings.HasPrefix(mtype, "text/") { - return true + return true, true } // images switch ext { case ".svg", ".bmp": - return true + return true, true } if strings.HasPrefix(mtype, "image/") { - return false + return false, true } // by file name extension switch ext { case ".zip", ".rar", ".gz", ".bz2", ".xz": - return false + return false, true case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": - return true + return true, true case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp": - return true + return true, true case ".png", ".jpg", ".jpeg": - return false + return false, true } // by mime type if strings.HasPrefix(mtype, "application/") { if strings.HasSuffix(mtype, "xml") { - return true + return true, true } if strings.HasSuffix(mtype, "script") { - return true + return true, true } } - isMostlyText := util.IsText(data) - - return isMostlyText + return false, false } func GzipData(input []byte) ([]byte, error) { buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) + w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) if _, err := w.Write(input); err != nil { glog.V(2).Infoln("error compressing data:", err) return nil, err diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 2276c67b7..dcab1a0ae 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,6 +2,8 @@ package operation import ( "bytes" + "compress/flate" + "compress/gzip" "encoding/json" "errors" "fmt" @@ -39,10 +41,24 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") // Upload sends a POST request to a volume server to upload the content func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { + contentIsGzipped := isGzipped + shouldGzipNow := false + if !isGzipped { + if shouldBeZipped, iAmSure := IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { + shouldGzipNow = true + contentIsGzipped = true + } + } return upload_content(uploadUrl, func(w io.Writer) (err error) { - _, err = io.Copy(w, reader) + if shouldGzipNow { + gzWriter, _ := gzip.NewWriterLevel(w, flate.BestSpeed) + _, err = io.Copy(gzWriter, reader) + gzWriter.Close() + } else { + _, err = io.Copy(w, reader) + } return - }, filename, isGzipped, mtype, pairMap, jwt) + }, filename, contentIsGzipped, mtype, pairMap, jwt) } func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { body_buf := bytes.NewBufferString("") diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 0f3473ff2..d24770e3d 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -3,6 +3,7 @@ package filersink import ( "context" "fmt" + "google.golang.org/grpc" "strings" "sync" @@ -105,15 +106,11 @@ func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.Fi func (fs *FilerSink) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(ctx, fs.grpcAddress, fs.grpcDialOption) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) - } - defer grpcConnection.Close() - - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, fs.grpcAddress, fs.grpcDialOption) - return fn(client) } func volumeId(fileId string) string { diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 3ab6c7261..d7b5ebc4d 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -91,15 +91,11 @@ func (fs *FilerSource) ReadPart(ctx context.Context, part string) (filename stri func (fs *FilerSource) withFilerClient(ctx context.Context, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(ctx, fs.grpcAddress, grpcDialOption) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) - } - defer grpcConnection.Close() - - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, fs.grpcAddress, fs.grpcDialOption) - return fn(client) } func volumeId(fileId string) string { diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 5a63648ca..127be07e3 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" "net/http" "net/url" "time" @@ -38,15 +39,11 @@ func encodeResponse(response interface{}) []byte { func (s3a *S3ApiServer) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(ctx, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", s3a.option.FilerGrpcAddress, err) - } - defer grpcConnection.Close() - - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) - return fn(client) } // If none of the http routes match respond with MethodNotAllowed diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 4c8ff5700..4ae2db030 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -63,7 +63,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, - int(heartbeat.MaxVolumeCount)) + int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 95e55a497..4f0195084 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -68,8 +68,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request } if err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) + if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { + err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) } else { count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) } diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go index 13208a3f8..f14350f02 100644 --- a/weed/shell/command_fs_cd.go +++ b/weed/shell/command_fs_cd.go @@ -2,11 +2,7 @@ package shell import ( "context" - "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" - "strings" ) func init() { @@ -35,59 +31,23 @@ func (c *commandFsCd) Help() string { func (c *commandFsCd) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { - input := "" - if len(args) > 0 { - input = args[len(args)-1] - } + input := findInputDirectory(args) filerServer, filerPort, path, err := commandEnv.parseUrl(input) if err != nil { return err } - dir, name := filer2.FullPath(path).DirAndName() - if strings.HasSuffix(path, "/") { - if path == "/" { - dir, name = "/", "" - } else { - dir, name = filer2.FullPath(path[0:len(path)-1]).DirAndName() - } + if path == "/" { + commandEnv.option.FilerHost = filerServer + commandEnv.option.FilerPort = filerPort + commandEnv.option.Directory = "/" + return nil } ctx := context.Background() - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: dir, - Prefix: name, - StartFromFileName: name, - InclusiveStartFrom: true, - Limit: 1, - }) - if listErr != nil { - return listErr - } - - if path == "" || path == "/" { - return nil - } - - if len(resp.Entries) == 0 { - return fmt.Errorf("entry not found") - } - - if resp.Entries[0].Name != name { - println("path", path, "dir", dir, "name", name, "found", resp.Entries[0].Name) - return fmt.Errorf("not a valid directory, found %s", resp.Entries[0].Name) - } - - if !resp.Entries[0].IsDirectory { - return fmt.Errorf("not a directory") - } - - return nil - }) + err = commandEnv.checkDirectory(ctx, filerServer, filerPort, path) if err == nil { commandEnv.option.FilerHost = filerServer diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 98e2eebd1..f305cabdc 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -8,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" "io" - "strings" ) func init() { @@ -33,21 +32,18 @@ func (c *commandFsDu) Help() string { func (c *commandFsDu) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { - filerServer, filerPort, path, err := commandEnv.parseUrl(args[0]) + filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } - dir, name := filer2.FullPath(path).DirAndName() - if strings.HasSuffix(path, "/") { - if path == "/" { - dir, name = "/", "" - } else { - dir, name = path[0 : len(path)-1], "" - } + ctx := context.Background() + + if commandEnv.isDirectory(ctx, filerServer, filerPort, path) { + path = path + "/" } - ctx := context.Background() + dir, name := filer2.FullPath(path).DirAndName() return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 7b8d1d0cc..93b86fa9f 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -36,41 +36,33 @@ func (c *commandFsLs) Do(args []string, commandEnv *commandEnv, writer io.Writer var isLongFormat, showHidden bool for _, arg := range args { - switch arg { - case "-a": - showHidden = true - case "-l": - isLongFormat = true + if !strings.HasPrefix(arg, "-") { + break } - } - - input := "" - if len(args) > 0 { - input = args[len(args)-1] - if strings.HasPrefix(input, "-") { - input = "" + for _, t := range arg { + switch t { + case 'a': + showHidden = true + case 'l': + isLongFormat = true + } } } + input := findInputDirectory(args) + filerServer, filerPort, path, err := commandEnv.parseUrl(input) if err != nil { return err } - if input == "" && !strings.HasSuffix(path, "/") { + + ctx := context.Background() + + if commandEnv.isDirectory(ctx, filerServer, filerPort, path) { path = path + "/" } dir, name := filer2.FullPath(path).DirAndName() - // println("path", path, "dir", dir, "name", name) - if strings.HasSuffix(path, "/") { - if path == "/" { - dir, name = "/", "" - } else { - dir, name = path[0 : len(path)-1], "" - } - } - - ctx := context.Background() return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index 805b17d2a..f1ffc9e4b 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -29,33 +29,33 @@ func (c *commandFsTree) Help() string { func (c *commandFsTree) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { - filerServer, filerPort, path, err := parseFilerUrl(args[len(args)-1]) + filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } dir, name := filer2.FullPath(path).DirAndName() - if strings.HasSuffix(path, "/") { - if path == "/" { - dir, name = "/", "" - } else { - dir, name = path[0:len(path)-1], "" - } - } ctx := context.Background() return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - return treeTraverseDirectory(ctx, writer, client, dir, name, 1000, newPrefix(), 0) + dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, dir, name, newPrefix(), -1) + + if terr == nil { + fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) + } + + return terr }) } -func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int, prefix *Prefix, level int) (err error) { +func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { paginatedCount := -1 startFromFileName := "" + paginateSize := 1000 for paginatedCount == -1 || paginatedCount == paginateSize { resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ @@ -76,17 +76,29 @@ func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_p } for i, entry := range resp.Entries { + + if level < 0 && name != "" { + if entry.Name != name { + break + } + } + // 0.1% wrong prefix here, but fixing it would need to paginate to the next batch first isLast := paginatedCount < paginateSize && i == paginatedCount-1 fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name) if entry.IsDirectory { + directoryCount++ subDir := fmt.Sprintf("%s/%s", dir, entry.Name) if dir == "/" { subDir = "/" + entry.Name } - err = treeTraverseDirectory(ctx, writer, client, subDir, "", paginateSize, prefix, level+1) + dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, subDir, "", prefix, level+1) + directoryCount += dirCount + fileCount += fCount + err = terr } else { + fileCount++ } startFromFileName = entry.Name @@ -114,6 +126,9 @@ func (p *Prefix) removeMarker(marker int) { } func (p *Prefix) getPrefix(level int, isLastChild bool) string { var sb strings.Builder + if level < 0 { + return "" + } for i := 0; i < level; i++ { if _, ok := p.markers[i]; ok { sb.WriteString("│") diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index f3f843d58..b3dca0d0b 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -42,30 +42,75 @@ func (c *commandVolumeList) Do(args []string, commandEnv *commandEnv, writer io. return nil } -func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo) { +func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo) statistics { fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount) + var s statistics for _, dc := range t.DataCenterInfos { - writeDataCenterInfo(writer, dc) + s = s.plus(writeDataCenterInfo(writer, dc)) } + fmt.Fprintf(writer, "%+v \n", s) + return s } -func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) { +func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics { fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount) + var s statistics for _, r := range t.RackInfos { - writeRackInfo(writer, r) + s = s.plus(writeRackInfo(writer, r)) } + fmt.Fprintf(writer, " DataCenter %s %+v \n", t.Id, s) + return s } -func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) { +func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics { fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount) + var s statistics for _, dn := range t.DataNodeInfos { - writeDataNodeInfo(writer, dn) + s = s.plus(writeDataNodeInfo(writer, dn)) } + fmt.Fprintf(writer, " Rack %s %+v \n", t.Id, s) + return s } -func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) { +func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount) + var s statistics for _, vi := range t.VolumeInfos { - writeVolumeInformationMessage(writer, vi) + s = s.plus(writeVolumeInformationMessage(writer, vi)) } + fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) + return s } -func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) { +func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics { fmt.Fprintf(writer, " volume %+v \n", t) + return newStatiscis(t) +} + +type statistics struct { + Size uint64 + FileCount uint64 + DeletedFileCount uint64 + DeletedBytes uint64 +} + +func newStatiscis(t *master_pb.VolumeInformationMessage) statistics { + return statistics{ + Size: t.Size, + FileCount: t.FileCount, + DeletedFileCount: t.DeleteCount, + DeletedBytes: t.DeletedByteCount, + } +} + +func (s statistics) plus(t statistics) statistics { + return statistics{ + Size: s.Size + t.Size, + FileCount: s.FileCount + t.FileCount, + DeletedFileCount: s.DeletedFileCount + t.DeletedFileCount, + DeletedBytes: s.DeletedBytes + t.DeletedBytes, + } +} + +func (s statistics) String() string { + if s.DeletedFileCount > 0 { + return fmt.Sprintf("total size:%d file_count:%d deleted_file:%d deleted_bytes:%d", s.Size, s.FileCount, s.DeletedFileCount, s.DeletedBytes) + } + return fmt.Sprintf("total size:%d file_count:%d", s.Size, s.FileCount) } diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 2a262d913..50b70498d 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -1,7 +1,10 @@ package shell import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" "io" @@ -46,6 +49,46 @@ func (ce *commandEnv) parseUrl(input string) (filerServer string, filerPort int6 return ce.option.FilerHost, ce.option.FilerPort, input, err } +func (ce *commandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool { + + return ce.checkDirectory(ctx,filerServer,filerPort,path) == nil + +} + +func (ce *commandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error { + + dir, name := filer2.FullPath(path).DirAndName() + + return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + + resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: dir, + Prefix: name, + StartFromFileName: name, + InclusiveStartFrom: true, + Limit: 1, + }) + if listErr != nil { + return listErr + } + + if len(resp.Entries) == 0 { + return fmt.Errorf("entry not found") + } + + if resp.Entries[0].Name != name { + return fmt.Errorf("not a valid directory, found %s", resp.Entries[0].Name) + } + + if !resp.Entries[0].IsDirectory { + return fmt.Errorf("not a directory") + } + + return nil + }) + +} + func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { if strings.HasPrefix(entryPath, "http") { var u *url.URL @@ -64,3 +107,14 @@ func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path } return } + +func findInputDirectory(args []string) (input string) { + input = "." + if len(args) > 0 { + input = args[len(args)-1] + if strings.HasPrefix(input, "-") { + input = "." + } + } + return input +} diff --git a/weed/storage/needle_parse_multipart.go b/weed/storage/needle_parse_multipart.go index e8d57ee38..3dba81fcf 100644 --- a/weed/storage/needle_parse_multipart.go +++ b/weed/storage/needle_parse_multipart.go @@ -88,10 +88,12 @@ func parseMultipart(r *http.Request) ( } isGzipped = true } else if operation.IsGzippable(ext, mtype, data) { - if data, e = operation.GzipData(data); e != nil { - return + if compressedData, err := operation.GzipData(data); err == nil { + if len(data) > len(compressedData) { + data = compressedData + isGzipped = true + } } - isGzipped = true } } diff --git a/weed/topology/node.go b/weed/topology/node.go index b7d2f79ec..db70c9734 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -5,6 +5,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -14,16 +15,16 @@ type NodeId string type Node interface { Id() NodeId String() string - FreeSpace() int - ReserveOneVolume(r int) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + FreeSpace() int64 + ReserveOneVolume(r int64) (*DataNode, error) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) + UpAdjustVolumeCountDelta(volumeCountDelta int64) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) UpAdjustMaxVolumeId(vid storage.VolumeId) - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int + GetVolumeCount() int64 + GetActiveVolumeCount() int64 + GetMaxVolumeCount() int64 GetMaxVolumeId() storage.VolumeId SetParent(Node) LinkChildNode(node Node) @@ -40,9 +41,9 @@ type Node interface { } type NodeImpl struct { id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int + volumeCount int64 + activeVolumeCount int64 + maxVolumeCount int64 parent Node sync.RWMutex // lock children children map[NodeId]Node @@ -126,7 +127,7 @@ func (n *NodeImpl) String() string { func (n *NodeImpl) Id() NodeId { return n.id } -func (n *NodeImpl) FreeSpace() int { +func (n *NodeImpl) FreeSpace() int64 { return n.maxVolumeCount - n.volumeCount } func (n *NodeImpl) SetParent(node Node) { @@ -146,7 +147,7 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { +func (n *NodeImpl) ReserveOneVolume(r int64) (assignedNode *DataNode, err error) { n.RLock() defer n.RUnlock() for _, node := range n.children { @@ -171,20 +172,20 @@ func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { return nil, errors.New("No free volume slot found!") } -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.maxVolumeCount, maxVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) } } -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.volumeCount, volumeCountDelta) if n.parent != nil { n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) } } -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int64) { //can be negative + atomic.AddInt64(&n.activeVolumeCount, activeVolumeCountDelta) if n.parent != nil { n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) } @@ -200,13 +201,13 @@ func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { return n.maxVolumeId } -func (n *NodeImpl) GetVolumeCount() int { +func (n *NodeImpl) GetVolumeCount() int64 { return n.volumeCount } -func (n *NodeImpl) GetActiveVolumeCount() int { +func (n *NodeImpl) GetActiveVolumeCount() int64 { return n.activeVolumeCount } -func (n *NodeImpl) GetMaxVolumeCount() int { +func (n *NodeImpl) GetMaxVolumeCount() int64 { return n.maxVolumeCount } diff --git a/weed/topology/rack.go b/weed/topology/rack.go index f8f8ce34a..932c1a804 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -28,7 +28,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int64) *DataNode { for _, c := range r.Children() { dn := c.(*DataNode) if dn.MatchLocation(ip, port) { diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 07dc9c67b..a8bdec902 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -47,8 +47,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) - assert(t, "volumeCount", topo.volumeCount, volumeCount) + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) } { @@ -71,13 +71,13 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { } topo.SyncDataNodeRegistration(volumeMessages, dn) - assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount) - assert(t, "volumeCount", topo.volumeCount, volumeCount) + assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount) + assert(t, "volumeCount", int(topo.volumeCount), volumeCount) } topo.UnRegisterDataNode(dn) - assert(t, "activeVolumeCount2", topo.activeVolumeCount, 0) + assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0) } diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index ef39a1c01..514033ca1 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -105,7 +105,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.DiffRackCount+rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 @@ -134,7 +134,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.FreeSpace() < rp.SameRackCount+1 { + if node.FreeSpace() < int64(rp.SameRackCount+1) { return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { @@ -175,7 +175,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Intn(rack.FreeSpace()) + r := rand.Int63n(rack.FreeSpace()) if server, e := rack.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { @@ -183,7 +183,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } } for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) + r := rand.Int63n(datacenter.FreeSpace()) if server, e := datacenter.ReserveOneVolume(r); e == nil { servers = append(servers, server) } else { diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index f983df1ec..1963cb928 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -101,7 +101,7 @@ func setup(topologyLayout string) *Topology { Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + server.UpAdjustMaxVolumeCountDelta(int64(serverMap["limit"].(float64))) } } } diff --git a/weed/util/file_util_non_posix.go b/weed/util/file_util_non_posix.go new file mode 100644 index 000000000..ffcfef6d5 --- /dev/null +++ b/weed/util/file_util_non_posix.go @@ -0,0 +1,12 @@ +// +build linux darwin freebsd netbsd openbsd plan9 solaris zos + +package util + +import ( + "os" + "syscall" +) + +func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) { + return fi.Sys().(*syscall.Stat_t).Uid, fi.Sys().(*syscall.Stat_t).Gid +} diff --git a/weed/util/file_util_posix.go b/weed/util/file_util_posix.go new file mode 100644 index 000000000..22ca60b3b --- /dev/null +++ b/weed/util/file_util_posix.go @@ -0,0 +1,11 @@ +// +build windows + +package util + +import ( + "os" +) + +func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) { + return 0, 0 +} diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go index e5993aeab..5c08538dc 100644 --- a/weed/util/grpc_client_server.go +++ b/weed/util/grpc_client_server.go @@ -3,6 +3,7 @@ package util import ( "context" "fmt" + "net/http" "strconv" "strings" "sync" @@ -18,6 +19,10 @@ var ( grpcClientsLock sync.Mutex ) +func init(){ + http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 100 +} + func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { var options []grpc.ServerOption options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{ diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 794471f7b..7a0bc9181 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -106,15 +106,11 @@ func withMasterClient(ctx context.Context, master string, grpcDialOption grpc.Di return fmt.Errorf("failed to parse master grpc %v", master) } - grpcConnection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", master, err) - } - defer grpcConnection.Close() - - client := master_pb.NewSeaweedClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + client := master_pb.NewSeaweedClient(grpcConnection) + return fn(ctx, client) + }, masterGrpcAddress, grpcDialOption) - return fn(ctx, client) } func (mc *MasterClient) WithClient(ctx context.Context, fn func(client master_pb.SeaweedClient) error) error { |
