aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-03-02 13:50:46 -0800
committerchrislu <chris.lu@gmail.com>2022-03-02 13:50:46 -0800
commit6fbbc785745af3427028dfa78a26466bdfaed41f (patch)
tree47eaab94a763862fb72fa427e2880ad24b285a5d
parent784583afc6513f2083190a1e0765cf181b88cba6 (diff)
downloadseaweedfs-6fbbc785745af3427028dfa78a26466bdfaed41f.tar.xz
seaweedfs-6fbbc785745af3427028dfa78a26466bdfaed41f.zip
stream reading a whole chunk
-rw-r--r--weed/filer/filechunk_manifest.go28
1 files changed, 19 insertions, 9 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index f2f1c7f16..c74af226c 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,12 +3,12 @@ package filer
import (
"bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util/mem"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
"net/url"
"strings"
+ "sync"
"time"
"github.com/golang/protobuf/proto"
@@ -19,9 +19,15 @@ import (
)
const (
- ManifestBatch = 10000
+ ManifestBatch = 3
)
+var bytesBufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
for _, chunk := range chunks {
if chunk.IsChunkManifest {
@@ -78,14 +84,14 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// IsChunkManifest
- data := mem.Allocate(int(chunk.Size))
- defer mem.Free(data)
- _, err := fetchChunk(data, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
+ defer bytesBufferPool.Put(bytesBuffer)
+ err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
}
m := &filer_pb.FileChunkManifest{}
- if err := proto.Unmarshal(data, m); err != nil {
+ if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
}
@@ -95,13 +101,17 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// TODO fetch from cache for weed mount?
-func fetchChunk(data []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) (int, error) {
+func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return 0, err
+ return err
+ }
+ err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
+ if err != nil {
+ return err
}
- return retriedFetchChunkData(data, urlStrings, cipherKey, isGzipped, true, 0)
+ return nil
}
func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {