diff options
Diffstat (limited to 'weed/filer/filechunk_manifest.go')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 51 |
1 files changed, 33 insertions, 18 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index b6a64b30d..a1f84b38e 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -8,6 +8,7 @@ import ( "math" "net/url" "strings" + "sync" "time" "github.com/golang/protobuf/proto" @@ -21,6 +22,12 @@ const ( ManifestBatch = 10000 ) +var bytesBufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { for _, chunk := range chunks { if chunk.IsChunkManifest { @@ -61,12 +68,12 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun manifestChunks = append(manifestChunks, chunk) // recursive - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) + dataChunks, manifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { return chunks, nil, subErr } - dataChunks = append(dataChunks, dchunks...) - manifestChunks = append(manifestChunks, mchunks...) + dataChunks = append(dataChunks, dataChunks...) + manifestChunks = append(manifestChunks, manifestChunks...) } return } @@ -77,12 +84,15 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // IsChunkManifest - data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) + bytesBuffer.Reset() + defer bytesBufferPool.Put(bytesBuffer) + err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) if err != nil { return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) } m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { + if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil { return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) } @@ -92,38 +102,43 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // TODO fetch from cache for weed mount? -func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { +func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err + return err + } + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + if err != nil { + return err } - return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) + return nil } -func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) { +func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err + return 0, err } - return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size) + return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { +func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { - var err error var shouldRetry bool - receivedData := make([]byte, 0, size) for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - receivedData = receivedData[:0] + n = 0 if strings.Contains(urlString, "%") { urlString = url.PathEscape(urlString) } - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { - receivedData = append(receivedData, data...) + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } }) if !shouldRetry { break @@ -142,7 +157,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool } } - return receivedData, err + return n, err } |
