diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/filechunk_manifest.go | 136 | ||||
| -rw-r--r-- | weed/filer2/filechunk_manifest_test.go | 113 | ||||
| -rw-r--r-- | weed/filer2/filechunks.go | 32 | ||||
| -rw-r--r-- | weed/filer2/filechunks_test.go | 10 | ||||
| -rw-r--r-- | weed/filer2/reader_at.go | 17 | ||||
| -rw-r--r-- | weed/filer2/stream.go | 30 |
6 files changed, 299 insertions, 39 deletions
diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go new file mode 100644 index 000000000..92b114853 --- /dev/null +++ b/weed/filer2/filechunk_manifest.go @@ -0,0 +1,136 @@ +package filer2 + +import ( + "bytes" + "fmt" + "io" + "math" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { + for _, chunk := range chunks { + if chunk.IsChunkManifest { + return true + } + } + return false +} + +func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) { + // TODO maybe parallel this + for _, chunk := range chunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + continue + } + + // IsChunkManifest + data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed) + if err != nil { + return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err) + } + m := &filer_pb.FileChunkManifest{} + if err := proto.Unmarshal(data, m); err != nil { + return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err) + } + manifestChunks = append(manifestChunks, chunk) + // recursive + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) + if subErr != nil { + return chunks, nil, subErr + } + dataChunks = append(dataChunks, dchunks...) + manifestChunks = append(manifestChunks, mchunks...) + } + return +} + +func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { + urlString, err := lookupFileIdFn(fileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + var buffer bytes.Buffer + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", fileId, err) + return nil, err + } + + return buffer.Bytes(), nil +} + +func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { + return doMaybeManifestize(saveFunc, dataChunks, 10000, mergeIntoManifest) +} + +func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) { + + var dataChunks []*filer_pb.FileChunk + for _, chunk := range inputChunks { + if !chunk.IsChunkManifest { + dataChunks = append(dataChunks, chunk) + } else { + chunks = append(chunks, chunk) + } + } + + manifestBatch := mergeFactor + remaining := len(dataChunks) + for i := 0; i+manifestBatch <= len(dataChunks); i += manifestBatch { + chunk, err := mergefn(saveFunc, dataChunks[i:i+manifestBatch]) + if err != nil { + return dataChunks, err + } + chunks = append(chunks, chunk) + remaining -= manifestBatch + } + // remaining + for i := len(dataChunks) - remaining; i < len(dataChunks); i++ { + chunks = append(chunks, dataChunks[i]) + } + return +} + +func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + // create and serialize the manifest + data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{ + Chunks: dataChunks, + }) + if serErr != nil { + return nil, fmt.Errorf("serializing manifest: %v", serErr) + } + + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0) + if err != nil { + return nil, err + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} + +type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) diff --git a/weed/filer2/filechunk_manifest_test.go b/weed/filer2/filechunk_manifest_test.go new file mode 100644 index 000000000..2b0862d07 --- /dev/null +++ b/weed/filer2/filechunk_manifest_test.go @@ -0,0 +1,113 @@ +package filer2 + +import ( + "bytes" + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestDoMaybeManifestize(t *testing.T) { + var manifestTests = []struct { + inputs []*filer_pb.FileChunk + expected []*filer_pb.FileChunk + }{ + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "12", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: false}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "23", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: false}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "2", IsChunkManifest: true}, + {FileId: "13", IsChunkManifest: true}, + {FileId: "4", IsChunkManifest: false}, + }, + }, + { + inputs: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "3", IsChunkManifest: false}, + {FileId: "4", IsChunkManifest: false}, + }, + expected: []*filer_pb.FileChunk{ + {FileId: "1", IsChunkManifest: true}, + {FileId: "2", IsChunkManifest: true}, + {FileId: "34", IsChunkManifest: true}, + }, + }, + } + + for i, mtest := range manifestTests { + println("test", i) + actual, _ := doMaybeManifestize(nil, mtest.inputs, 2, mockMerge) + assertEqualChunks(t, mtest.expected, actual) + } + +} + +func assertEqualChunks(t *testing.T, expected, actual []*filer_pb.FileChunk) { + assert.Equal(t, len(expected), len(actual)) + for i := 0; i < len(actual); i++ { + assertEqualChunk(t, actual[i], expected[i]) + } +} +func assertEqualChunk(t *testing.T, expected, actual *filer_pb.FileChunk) { + assert.Equal(t, expected.FileId, actual.FileId) + assert.Equal(t, expected.IsChunkManifest, actual.IsChunkManifest) +} + +func mockMerge(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) { + + var buf bytes.Buffer + minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64) + for k := 0; k < len(dataChunks); k++ { + chunk := dataChunks[k] + buf.WriteString(chunk.FileId) + if minOffset > int64(chunk.Offset) { + minOffset = chunk.Offset + } + if maxOffset < int64(chunk.Size)+chunk.Offset { + maxOffset = int64(chunk.Size) + chunk.Offset + } + } + + manifestChunk = &filer_pb.FileChunk{ + FileId: buf.String(), + } + manifestChunk.IsChunkManifest = true + manifestChunk.Offset = minOffset + manifestChunk.Size = uint64(maxOffset - minOffset) + + return +} diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 6832d0f31..ea7772b4a 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -46,9 +46,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { return fmt.Sprintf("%x", h.Sum32()) } -func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { +func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles := NonOverlappingVisibleIntervals(chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) fileIds := make(map[string]bool) for _, interval := range visibles { @@ -65,7 +65,23 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file return } -func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { +func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { + + aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) + if aErr != nil { + return nil, aErr + } + bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) + if bErr != nil { + return nil, bErr + } + + delta = append(delta, DoMinusChunks(aData, bData)...) + delta = append(delta, DoMinusChunks(aMeta, bMeta)...) + return +} + +func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { fileIds := make(map[string]bool) for _, interval := range bs { @@ -94,9 +110,9 @@ func (cv *ChunkView) IsFullChunk() bool { return cv.Size == cv.ChunkSize } -func ViewFromChunks(chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { +func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { - visibles := NonOverlappingVisibleIntervals(chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) return ViewFromVisibleIntervals(visibles, offset, size) @@ -190,7 +206,11 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. return newVisibles } -func NonOverlappingVisibleIntervals(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) { +// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory +// If the file chunk content is a chunk manifest +func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { + + chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) sort.Slice(chunks, func(i, j int) bool { return chunks[i].Mtime < chunks[j].Mtime diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index 7b1133b85..bfee59198 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -16,7 +16,7 @@ func TestCompactFileChunks(t *testing.T) { {Offset: 110, Size: 200, FileId: "jkl", Mtime: 300}, } - compacted, garbage := CompactFileChunks(chunks) + compacted, garbage := CompactFileChunks(nil, chunks) if len(compacted) != 3 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -49,7 +49,7 @@ func TestCompactFileChunks2(t *testing.T) { }) } - compacted, garbage := CompactFileChunks(chunks) + compacted, garbage := CompactFileChunks(nil, chunks) if len(compacted) != 4 { t.Fatalf("unexpected compacted: %d", len(compacted)) @@ -186,7 +186,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals := NonOverlappingVisibleIntervals(testcase.Chunks) + intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) for x, interval := range intervals { log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", i, x, interval.start, interval.stop, interval.fileId) @@ -371,7 +371,7 @@ func TestChunksReading(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) - chunks := ViewFromChunks(testcase.Chunks, testcase.Offset, testcase.Size) + chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) for x, chunk := range chunks { log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", i, x, chunk.Offset, chunk.Size, chunk.FileId) @@ -415,6 +415,6 @@ func BenchmarkCompactFileChunks(b *testing.B) { } for n := 0; n < b.N; n++ { - CompactFileChunks(chunks) + CompactFileChunks(nil, chunks) } } diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index 11a80443f..568d94267 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -1,7 +1,6 @@ package filer2 import ( - "bytes" "context" "fmt" "io" @@ -9,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -144,19 +142,6 @@ func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err err func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - urlString, err := c.lookupFileId(fileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) - return nil, err - } - var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", fileId, err) - return nil, err - } + return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) - return buffer.Bytes(), nil } diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 033a8dd13..c7df007ec 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,6 +2,7 @@ package filer2 import ( "bytes" + "fmt" "io" "math" "strings" @@ -14,7 +15,8 @@ import ( func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - chunkViews := ViewFromChunks(chunks, offset, size) + fmt.Printf("start to stream content for chunks: %+v\n", chunks) + chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) fileId2Url := make(map[string]string) @@ -50,14 +52,14 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) buffer := bytes.Buffer{} - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) - - lookupFileId := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { return masterClient.LookupFileId(fileId) } + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + for _, chunkView := range chunkViews { - urlString, err := lookupFileId(chunkView.FileId) + urlString, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err @@ -88,23 +90,27 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + } + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: func(fileId string) (targetUrl string, err error) { - return masterClient.LookupFileId(fileId) - }, + chunkViews: chunkViews, + lookupFileId: lookupFileIdFn, } } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + lookupFileIdFn := LookupFn(filerClient) + + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) return &ChunkStreamReader{ chunkViews: chunkViews, - lookupFileId: LookupFn(filerClient), + lookupFileId: lookupFileIdFn, } } |
