aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/chunked_file.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/chunked_file.go')
-rw-r--r--weed/operation/chunked_file.go62
1 files changed, 38 insertions, 24 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 295204dd8..8506e0518 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -8,11 +8,10 @@ import (
"io/ioutil"
"net/http"
"sort"
+ "sync"
"google.golang.org/grpc"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -41,23 +40,24 @@ type ChunkManifest struct {
// seekable chunked file reader
type ChunkedFileReader struct {
- Manifest *ChunkManifest
- Master string
- pos int64
- pr *io.PipeReader
- pw *io.PipeWriter
- mutex sync.Mutex
+ totalSize int64
+ chunkList []*ChunkInfo
+ master string
+ pos int64
+ pr *io.PipeReader
+ pw *io.PipeWriter
+ mutex sync.Mutex
}
func (s ChunkList) Len() int { return len(s) }
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
- if isGzipped {
+func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
+ if isCompressed {
var err error
- if buffer, err = util.UnGzipData(buffer); err != nil {
- return nil, err
+ if buffer, err = util.DecompressData(buffer); err != nil {
+ glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
}
}
cm := ChunkManifest{}
@@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
+func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, grpcDialOption, fileIds)
+ results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@@ -126,16 +126,29 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64,
return io.Copy(w, resp.Body)
}
+func NewChunkedFileReader(chunkList []*ChunkInfo, master string) *ChunkedFileReader {
+ var totalSize int64
+ for _, chunk := range chunkList {
+ totalSize += chunk.Size
+ }
+ sort.Sort(ChunkList(chunkList))
+ return &ChunkedFileReader{
+ totalSize: totalSize,
+ chunkList: chunkList,
+ master: master,
+ }
+}
+
func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
- case 1:
+ case io.SeekStart:
+ case io.SeekCurrent:
offset += cf.pos
- case 2:
- offset = cf.Manifest.Size - offset
+ case io.SeekEnd:
+ offset = cf.totalSize + offset
}
- if offset > cf.Manifest.Size {
+ if offset > cf.totalSize {
err = ErrInvalidRange
}
if cf.pos != offset {
@@ -146,10 +159,9 @@ func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
}
func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
- cm := cf.Manifest
chunkIndex := -1
chunkStartOffset := int64(0)
- for i, ci := range cm.Chunks {
+ for i, ci := range cf.chunkList {
if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
chunkIndex = i
chunkStartOffset = cf.pos - ci.Offset
@@ -159,10 +171,12 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
if chunkIndex < 0 {
return n, ErrInvalidRange
}
- for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
- ci := cm.Chunks[chunkIndex]
+ for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
+ ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
+ fileUrl, lookupError := LookupFileId(func() string {
+ return cf.master
+ }, ci.Fid)
if lookupError != nil {
return n, lookupError
}