aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go475
1 files changed, 320 insertions, 155 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index b519afe86..0aee8cd80 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -1,53 +1,60 @@
package command
import (
+ "context"
"fmt"
+ "io"
"io/ioutil"
+ "net/http"
"net/url"
"os"
"path/filepath"
+ "strconv"
"strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc"
- "context"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
- "io"
- "net/http"
- "path"
- "strconv"
- "time"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
)
var (
- copy CopyOptions
+ copy CopyOptions
+ waitGroup sync.WaitGroup
)
type CopyOptions struct {
- filerGrpcPort *int
- master *string
- include *string
- replication *string
- collection *string
- ttl *string
- maxMB *int
- secretKey *string
-
- secret security.Secret
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ masterClient *wdclient.MasterClient
+ concurrenctFiles *int
+ concurrenctChunks *int
+ grpcDialOption grpc.DialOption
+ masters []string
+ cipher bool
+ ttlSec int32
}
func init() {
cmdCopy.Run = runCopy // break init cycle
cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
- copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
- copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
- copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
- copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
+ copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
+ copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
}
var cmdCopy = &Command{
@@ -61,17 +68,15 @@ var cmdCopy = &Command{
All files under the folder and subfolders will be copyed.
Optional parameter "-include" allows you to specify the file name patterns.
- If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
- This can save volume server's gzipped processing and allow customizable gzip compression level.
- The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
-
If "maxMB" is set to a positive number, files larger than it would be split into chunks.
`,
}
func runCopy(cmd *Command, args []string) bool {
- copy.secret = security.Secret(*copy.secretKey)
+
+ util.LoadConfiguration("security", false)
+
if len(args) <= 1 {
return false
}
@@ -85,7 +90,8 @@ func runCopy(cmd *Command, args []string) bool {
}
urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
- urlPath = urlPath + "/"
+ fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
+ return false
}
if filerUrl.Port() == "" {
@@ -100,253 +106,412 @@ func runCopy(cmd *Command, args []string) bool {
}
filerGrpcPort := filerPort + 10000
- if *copy.filerGrpcPort != 0 {
- filerGrpcPort = uint64(*copy.filerGrpcPort)
+ filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+ copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
+ if err != nil {
+ fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
+ return false
+ }
+ if *copy.collection == "" {
+ *copy.collection = collection
+ }
+ if *copy.replication == "" {
+ *copy.replication = replication
}
+ if *copy.maxMB == 0 {
+ *copy.maxMB = int(maxMB)
+ }
+ copy.masters = masters
+ copy.cipher = cipher
- filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+ ttl, err := needle.ReadTTL(*copy.ttl)
+ if err != nil {
+ fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
+ return false
+ }
+ copy.ttlSec = int32(ttl.Minutes()) * 60
- for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
- return false
+ if *cmdCopy.IsDebug {
+ util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
+ }
+
+ fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
+
+ go func() {
+ defer close(fileCopyTaskChan)
+ for _, fileOrDir := range fileOrDirs {
+ if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
+ fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
+ break
+ }
}
+ }()
+ for i := 0; i < *copy.concurrenctFiles; i++ {
+ waitGroup.Add(1)
+ go func() {
+ defer waitGroup.Done()
+ worker := FileCopyWorker{
+ options: &copy,
+ filerHost: filerUrl.Host,
+ filerGrpcAddress: filerGrpcAddress,
+ }
+ if err := worker.copyFiles(fileCopyTaskChan); err != nil {
+ fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
+ return
+ }
+ }()
}
+ waitGroup.Wait()
+
return true
}
-func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
- f, err := os.Open(fileOrDir)
- if err != nil {
- fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
- return false
- }
- defer f.Close()
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
+ cipher = resp.Cipher
+ return nil
+ })
+ return
+}
+
+func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
- fi, err := f.Stat()
+ fi, err := os.Stat(fileOrDir)
if err != nil {
- fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
- return false
+ fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
+ return nil
}
mode := fi.Mode()
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
- if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
- return false
+ if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
+ return err
}
}
- return true
+ return nil
+ }
+
+ uid, gid := util.GetFileUidGid(fi)
+
+ fileCopyTaskChan <- FileCopyTask{
+ sourceLocation: fileOrDir,
+ destinationUrlPath: destPath,
+ fileSize: fi.Size(),
+ fileMode: fi.Mode(),
+ uid: uid,
+ gid: gid,
+ }
+
+ return nil
+}
+
+type FileCopyWorker struct {
+ options *CopyOptions
+ filerHost string
+ filerGrpcAddress string
+}
+
+func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
+ for task := range fileCopyTaskChan {
+ if err := worker.doEachCopy(task); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type FileCopyTask struct {
+ sourceLocation string
+ destinationUrlPath string
+ fileSize int64
+ fileMode os.FileMode
+ uid uint32
+ gid uint32
+}
+
+func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
+
+ f, err := os.Open(task.sourceLocation)
+ if err != nil {
+ fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
+ if _, ok := err.(*os.PathError); ok {
+ fmt.Printf("skipping %s\n", task.sourceLocation)
+ return nil
+ }
+ return err
}
+ defer f.Close()
// this is a regular file
- if *copy.include != "" {
- if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
- return true
+ if *worker.options.include != "" {
+ if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
+ return nil
}
}
// find the chunk count
- chunkSize := int64(*copy.maxMB * 1024 * 1024)
+ chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
chunkCount := 1
- if chunkSize > 0 && fi.Size() > chunkSize {
- chunkCount = int(fi.Size()/chunkSize) + 1
+ if chunkSize > 0 && task.fileSize > chunkSize {
+ chunkCount = int(task.fileSize/chunkSize) + 1
}
if chunkCount == 1 {
- return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
+ return worker.uploadFileAsOne(task, f)
}
- return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
+ return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
-func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
+func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
- isGzipped := isGzipped(fileName)
+ data, err := ioutil.ReadAll(f)
+ if err != nil {
+ return err
+ }
var chunks []*filer_pb.FileChunk
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
- if fi.Size() > 0 {
+ if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *copy.replication,
- Collection: *copy.collection,
- Ttl: *copy.ttl,
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
- fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
- uploadResult, err := operation.Upload(targetUrl, fileName, f, isGzipped, mimeType, nil, "")
+ uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
- fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- return false
+ return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
if uploadResult.Error != "" {
- fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- return false
+ return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: 0,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: assignResult.FileId,
+ Offset: 0,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.Md5,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
})
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: urlFolder,
+ Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
- Gid: uint32(os.Getgid()),
- Uid: uint32(os.Getuid()),
- FileSize: uint64(fi.Size()),
- FileMode: uint32(fi.Mode()),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *copy.replication,
- Collection: *copy.collection,
- TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
- return false
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- return true
+ return nil
}
-func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
- var chunks []*filer_pb.FileChunk
+ chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
+
+ concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
+ var wg sync.WaitGroup
+ var uploadError error
+ var collection, replication string
+
+ fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
+ for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
+ wg.Add(1)
+ concurrentChunks <- struct{}{}
+ go func(i int64) {
+ defer func() {
+ wg.Done()
+ <-concurrentChunks
+ }()
+ // assign a volume
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- for i := int64(0); i < int64(chunkCount); i++ {
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ if collection == "" {
+ collection = assignResult.Collection
+ }
+ if replication == "" {
+ replication = assignResult.Replication
+ }
- // assign a volume
- assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *copy.replication,
- Collection: *copy.collection,
- Ttl: *copy.ttl,
- })
- if err != nil {
- fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
- }
+ uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
+ if err != nil {
+ uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ return
+ }
+ if uploadResult.Error != "" {
+ uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return
+ }
+ chunksChan <- &filer_pb.FileChunk{
+ FileId: assignResult.FileId,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
+ }
+ fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ }(i)
+ }
+ wg.Wait()
+ close(chunksChan)
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ var chunks []*filer_pb.FileChunk
+ for chunk := range chunksChan {
+ chunks = append(chunks, chunk)
+ }
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.LimitReader(f, chunkSize),
- false, "application/octet-stream", nil, "")
- if err != nil {
- fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- return false
- }
- if uploadResult.Error != "" {
- fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- return false
+ if uploadError != nil {
+ var fileIds []string
+ for _, chunk := range chunks {
+ fileIds = append(fileIds, chunk.FileId)
}
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- })
- fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
+ return uploadError
}
- if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: urlFolder,
+ Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
Crtime: time.Now().Unix(),
Mtime: time.Now().Unix(),
- Gid: uint32(os.Getgid()),
- Uid: uint32(os.Getuid()),
- FileSize: uint64(fi.Size()),
- FileMode: uint32(fi.Mode()),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *copy.replication,
- Collection: *copy.collection,
- TtlSec: int32(util.ParseInt(*copy.ttl, 0)),
+ Replication: replication,
+ Collection: collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
- return false
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
- return true
-}
-
-func isGzipped(filename string) bool {
- return strings.ToLower(path.Ext(filename)) == ".gz"
+ return nil
}
func detectMimeType(f *os.File) string {
head := make([]byte, 512)
- f.Seek(0, 0)
+ f.Seek(0, io.SeekStart)
n, err := f.Read(head)
if err == io.EOF {
return ""
}
if err != nil {
fmt.Printf("read head of %v: %v\n", f.Name(), err)
- return "application/octet-stream"
+ return ""
}
- f.Seek(0, 0)
+ f.Seek(0, io.SeekStart)
mimeType := http.DetectContentType(head[:n])
- return mimeType
-}
-
-func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- grpcConnection, err := util.GrpcDial(filerAddress)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
+ if mimeType == "application/octet-stream" {
+ return ""
}
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
-
- return fn(client)
+ return mimeType
}