aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filechunks.go118
1 files changed, 115 insertions, 3 deletions
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 7e97118a6..be18d45ac 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -3,10 +3,13 @@ package filer
import (
"bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
+ "sort"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
@@ -161,16 +164,112 @@ func logPrintf(name string, visibles []VisibleInterval) {
*/
}
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return new(VisibleInterval)
+ },
+}
+
+func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) {
+
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
+
+ length := len(visibles)
+ if length == 0 {
+ return append(visibles, newV)
+ }
+ last := visibles[length-1]
+ if last.stop <= chunk.Offset {
+ return append(visibles, newV)
+ }
+
+ logPrintf(" before", visibles)
+ // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size)
+ chunkStop := chunk.Offset + int64(chunk.Size)
+ for _, v := range visibles {
+ if v.start < chunk.Offset && chunk.Offset < v.stop {
+ t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)
+ newVisibles = append(newVisibles, t)
+ // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
+ }
+ if v.start < chunkStop && chunkStop < v.stop {
+ t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)
+ newVisibles = append(newVisibles, t)
+ // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop)
+ }
+ if chunkStop <= v.start || v.stop <= chunk.Offset {
+ newVisibles = append(newVisibles, v)
+ // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop)
+ }
+ }
+ newVisibles = append(newVisibles, newV)
+
+ logPrintf(" append", newVisibles)
+
+ for i := len(newVisibles) - 1; i >= 0; i-- {
+ if i > 0 && newV.start < newVisibles[i-1].start {
+ newVisibles[i] = newVisibles[i-1]
+ } else {
+ newVisibles[i] = newV
+ break
+ }
+ }
+ logPrintf(" sorted", newVisibles)
+
+ return newVisibles
+}
+
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
- visibles = readResolvedChunks(chunks)
+ visibles2 := readResolvedChunks(chunks)
+
+ if true {
+ return visibles2, err
+ }
+
+ sort.Slice(chunks, func(i, j int) bool {
+ if chunks[i].Mtime == chunks[j].Mtime {
+ filer_pb.EnsureFid(chunks[i])
+ filer_pb.EnsureFid(chunks[j])
+ if chunks[i].Fid == nil || chunks[j].Fid == nil {
+ return true
+ }
+ return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
+ }
+ return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run
+ })
+
+ for _, chunk := range chunks {
+
+ // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
+ visibles = MergeIntoVisibles(visibles, chunk)
+
+ logPrintf("add", visibles)
+
+ }
+
+ if len(visibles) != len(visibles2) {
+ fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2))
+ } else {
+ for i := 0; i < len(visibles); i++ {
+ checkDifference(visibles[i], visibles2[i])
+ }
+ }
return
+}
+func checkDifference(x, y VisibleInterval) {
+ if x.start != y.start ||
+ x.stop != y.stop ||
+ x.fileId != y.fileId ||
+ x.modifiedTime != y.modifiedTime {
+ fmt.Printf("different visible %+v : %+v\n", x, y)
+ }
}
// find non-overlapping visible intervals
@@ -187,6 +286,19 @@ type VisibleInterval struct {
isGzipped bool
}
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
+ return VisibleInterval{
+ start: start,
+ stop: stop,
+ fileId: fileId,
+ modifiedTime: modifiedTime,
+ chunkOffset: chunkOffset, // the starting position in the chunk
+ chunkSize: chunkSize,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ }
+}
+
func min(x, y int64) int64 {
if x <= y {
return x