diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-08-20 19:18:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-08-20 19:18:23 +0800 |
| commit | b0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (patch) | |
| tree | dcf5b0dfb71089126da5ec3a3fb8eb763a37c739 /weed/filer2 | |
| parent | 6a93e26fc32ce35901c96371628fd0916b639026 (diff) | |
| parent | f48567c5c62bf8c8cebf568eeb919f25a4fc4289 (diff) | |
| download | seaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.tar.xz seaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.zip | |
Merge pull request #12 from chrislusf/master
sync
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/entry.go | 10 | ||||
| -rw-r--r-- | weed/filer2/entry_codec.go | 2 | ||||
| -rw-r--r-- | weed/filer2/filechunk_manifest.go | 2 | ||||
| -rw-r--r-- | weed/filer2/filechunks.go | 41 | ||||
| -rw-r--r-- | weed/filer2/filechunks_test.go | 135 | ||||
| -rw-r--r-- | weed/filer2/filer.go | 63 | ||||
| -rw-r--r-- | weed/filer2/filer_delete_entry.go | 6 | ||||
| -rw-r--r-- | weed/filer2/filer_deletion.go | 10 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store_test.go | 2 | ||||
| -rw-r--r-- | weed/filer2/leveldb2/leveldb2_store_test.go | 2 | ||||
| -rw-r--r-- | weed/filer2/reader_at.go | 100 | ||||
| -rw-r--r-- | weed/filer2/reader_at_test.go | 156 | ||||
| -rw-r--r-- | weed/filer2/stream.go | 6 |
13 files changed, 368 insertions, 167 deletions
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go index 00b9b132d..fedfde40d 100644 --- a/weed/filer2/entry.go +++ b/weed/filer2/entry.go @@ -22,6 +22,7 @@ type Attr struct { GroupNames []string SymlinkTarget string Md5 []byte + FileSize uint64 } func (attr Attr) IsDirectory() bool { @@ -39,7 +40,7 @@ type Entry struct { } func (entry *Entry) Size() uint64 { - return TotalSize(entry.Chunks) + return maxUint64(TotalSize(entry.Chunks), entry.FileSize) } func (entry *Entry) Timestamp() time.Time { @@ -81,3 +82,10 @@ func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { Chunks: entry.Chunks, } } + +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index 47c911011..4d615194f 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -53,6 +53,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { GroupName: entry.Attr.GroupNames, SymlinkTarget: entry.Attr.SymlinkTarget, Md5: entry.Attr.Md5, + FileSize: entry.Attr.FileSize, } } @@ -73,6 +74,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.GroupNames = attr.GroupName t.SymlinkTarget = attr.SymlinkTarget t.Md5 = attr.Md5 + t.FileSize = attr.FileSize return t } diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go index 62d2c6e7f..037b0c1e8 100644 --- a/weed/filer2/filechunk_manifest.go +++ b/weed/filer2/filechunk_manifest.go @@ -64,7 +64,7 @@ func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKe return nil, err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { + err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) { buffer.Write(data) }) if err != nil { diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index ea7772b4a..1ab6679f9 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -20,6 +20,10 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { return } +func FileSize(entry *filer_pb.Entry) (size uint64) { + return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize) +} + func ETag(entry *filer_pb.Entry) (etag string) { if entry.Attributes == nil || entry.Attributes.Md5 == nil { return ETagChunks(entry.Chunks) @@ -100,7 +104,7 @@ type ChunkView struct { FileId string Offset int64 Size uint64 - LogicOffset int64 + LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk ChunkSize uint64 CipherKey []byte IsGzipped bool @@ -130,17 +134,18 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int for _, chunk := range visibles { - if chunk.start <= offset && offset < chunk.stop && offset < stop { + chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop) + + if chunkStart < chunkStop { views = append(views, &ChunkView{ FileId: chunk.fileId, - Offset: offset - chunk.start, // offset is the data starting location in this file id - Size: uint64(min(chunk.stop, stop) - offset), - LogicOffset: offset, + Offset: chunkStart - chunk.start + chunk.chunkOffset, + Size: uint64(chunkStop - chunkStart), + LogicOffset: chunkStart, ChunkSize: chunk.chunkSize, CipherKey: chunk.cipherKey, IsGzipped: chunk.isGzipped, }) - offset = min(chunk.stop, stop) } } @@ -149,10 +154,11 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int } func logPrintf(name string, visibles []VisibleInterval) { + /* - log.Printf("%s len %d", name, len(visibles)) + glog.V(0).Infof("%s len %d", name, len(visibles)) for _, v := range visibles { - log.Printf("%s: => %+v", name, v) + glog.V(0).Infof("%s: [%d,%d)", name, v.start, v.stop) } */ } @@ -165,7 +171,7 @@ var bufPool = sync.Pool{ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { - newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsCompressed) + newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) length := len(visibles) if length == 0 { @@ -177,13 +183,13 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. } logPrintf(" before", visibles) + chunkStop := chunk.Offset + int64(chunk.Size) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) + newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)) } - chunkStop := chunk.Offset + int64(chunk.Size) if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped)) + newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) @@ -219,6 +225,7 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu var newVisibles []VisibleInterval for _, chunk := range chunks { + // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) t := visibles[:0] visibles = newVisibles @@ -239,17 +246,19 @@ type VisibleInterval struct { stop int64 modifiedTime int64 fileId string + chunkOffset int64 chunkSize uint64 cipherKey []byte isGzipped bool } -func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { +func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval { return VisibleInterval{ start: start, stop: stop, fileId: fileId, modifiedTime: modifiedTime, + chunkOffset: chunkOffset, // the starting position in the chunk chunkSize: chunkSize, cipherKey: cipherKey, isGzipped: isGzipped, @@ -262,3 +271,9 @@ func min(x, y int64) int64 { } return y } +func max(x, y int64) int64 { + if x <= y { + return y + } + return x +} diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index bfee59198..70da6e16c 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -1,10 +1,13 @@ package filer2 import ( + "fmt" "log" + "math" "testing" - "fmt" + "github.com/stretchr/testify/assert" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) @@ -91,12 +94,12 @@ func TestIntervalMerging(t *testing.T) { // case 2: updates overwrite part of previous chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 70, FileId: "b", Mtime: 134}, }, Expected: []*VisibleInterval{ - {start: 0, stop: 50, fileId: "asdf"}, - {start: 50, stop: 100, fileId: "abc"}, + {start: 0, stop: 70, fileId: "b"}, + {start: 70, stop: 100, fileId: "a", chunkOffset: 70}, }, }, // case 3: updates overwrite full chunks @@ -126,14 +129,14 @@ func TestIntervalMerging(t *testing.T) { // case 5: updates overwrite full chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, - {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "d", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "c", Mtime: 143}, + {Offset: 80, Size: 100, FileId: "b", Mtime: 134}, }, Expected: []*VisibleInterval{ - {start: 0, stop: 200, fileId: "asdf"}, - {start: 200, stop: 220, fileId: "abc"}, + {start: 0, stop: 200, fileId: "d"}, + {start: 200, stop: 220, fileId: "c", chunkOffset: 130}, }, }, // case 6: same updates @@ -204,6 +207,10 @@ func TestIntervalMerging(t *testing.T) { t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s", i, x, interval.fileId, testcase.Expected[x].fileId) } + if interval.chunkOffset != testcase.Expected[x].chunkOffset { + t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d", + i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset) + } } if len(intervals) != len(testcase.Expected) { t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected)) @@ -251,14 +258,14 @@ func TestChunksReading(t *testing.T) { // case 2: updates overwrite part of previous chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134}, + {Offset: 3, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 10, Size: 50, FileId: "b", Mtime: 134}, }, - Offset: 25, - Size: 50, + Offset: 30, + Size: 40, Expected: []*ChunkView{ - {Offset: 25, Size: 25, FileId: "asdf", LogicOffset: 25}, - {Offset: 0, Size: 25, FileId: "abc", LogicOffset: 50}, + {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30}, + {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60}, }, }, // case 3: updates overwrite full chunks @@ -286,22 +293,22 @@ func TestChunksReading(t *testing.T) { Size: 400, Expected: []*ChunkView{ {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - // {Offset: 0, Size: 150, FileId: "xxxx"}, // missing intervals should not happen + {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250}, }, }, // case 5: updates overwrite full chunks { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184}, - {Offset: 70, Size: 150, FileId: "abc", Mtime: 143}, + {Offset: 0, Size: 100, FileId: "a", Mtime: 123}, + {Offset: 0, Size: 200, FileId: "c", Mtime: 184}, + {Offset: 70, Size: 150, FileId: "b", Mtime: 143}, {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134}, }, Offset: 0, Size: 220, Expected: []*ChunkView{ - {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0}, - {Offset: 0, Size: 20, FileId: "abc", LogicOffset: 200}, + {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0}, + {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200}, }, }, // case 6: same updates @@ -370,18 +377,21 @@ func TestChunksReading(t *testing.T) { } for i, testcase := range testcases { + if i != 2 { + // continue + } log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i) chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size) for x, chunk := range chunks { log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s", i, x, chunk.Offset, chunk.Size, chunk.FileId) if chunk.Offset != testcase.Expected[x].Offset { - t.Fatalf("failed on read case %d, chunk %d, Offset %d, expect %d", - i, x, chunk.Offset, testcase.Expected[x].Offset) + t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d", + i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset) } if chunk.Size != testcase.Expected[x].Size { - t.Fatalf("failed on read case %d, chunk %d, Size %d, expect %d", - i, x, chunk.Size, testcase.Expected[x].Size) + t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d", + i, chunk.FileId, chunk.Size, testcase.Expected[x].Size) } if chunk.FileId != testcase.Expected[x].FileId { t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s", @@ -418,3 +428,74 @@ func BenchmarkCompactFileChunks(b *testing.B) { CompactFileChunks(nil, chunks) } } + +func TestViewFromVisibleIntervals(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 0, + stop: 25, + fileId: "fid1", + }, + { + start: 4096, + stop: 8192, + fileId: "fid2", + }, + { + start: 16384, + stop: 18551, + fileId: "fid3", + }, + } + + views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} + +func TestViewFromVisibleIntervals2(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 344064, + stop: 348160, + fileId: "fid1", + }, + { + start: 348160, + stop: 356352, + fileId: "fid2", + }, + } + + views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} + +func TestViewFromVisibleIntervals3(t *testing.T) { + visibles := []VisibleInterval{ + { + start: 1000, + stop: 2000, + fileId: "fid1", + }, + { + start: 3000, + stop: 4000, + fileId: "fid2", + }, + } + + views := ViewFromVisibleIntervals(visibles, 1700, 1500) + + if len(views) != len(visibles) { + assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error") + } + +} diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index dd4c38857..d3dfa5a6f 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -9,8 +9,6 @@ import ( "google.golang.org/grpc" - "github.com/karlseguin/ccache" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -27,7 +25,6 @@ var ( type Filer struct { Store *FilerStoreWrapper - directoryCache *ccache.Cache MasterClient *wdclient.MasterClient fileIdDeletionQueue *util.UnboundedQueue GrpcDialOption grpc.DialOption @@ -44,7 +41,6 @@ type Filer struct { func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer { f := &Filer{ - directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, @@ -77,10 +73,6 @@ func (f *Filer) GetStore() (store FilerStore) { return f.Store } -func (f *Filer) DisableDirectoryCache() { - f.directoryCache = nil -} - func (fs *Filer) GetMaster() string { return fs.MasterClient.GetMaster() } @@ -117,16 +109,9 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr dirPath := "/" + util.Join(dirParts[:i]...) // fmt.Printf("%d directory: %+v\n", i, dirPath) - // first check local cache - dirEntry := f.cacheGetDirectory(dirPath) - - // not found, check the store directly - if dirEntry == nil { - glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath)) - } else { - // glog.V(4).Infof("found cached directory: %s", dirPath) - } + // check the store directly + glog.V(4).Infof("find uncached directory: %s", dirPath) + dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) // no such existing directory if dirEntry == nil { @@ -166,9 +151,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return fmt.Errorf("%s is a file", dirPath) } - // cache the directory entry - f.cacheSetDirectory(dirPath, dirEntry, i) - // remember the direct parent directory entry if i == len(dirParts)-1 { lastDirectoryEntry = dirEntry @@ -295,45 +277,6 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta return } -func (f *Filer) cacheDelDirectory(dirpath string) { - - if dirpath == "/" { - return - } - - if f.directoryCache == nil { - return - } - f.directoryCache.Delete(dirpath) - return -} - -func (f *Filer) cacheGetDirectory(dirpath string) *Entry { - - if f.directoryCache == nil { - return nil - } - item := f.directoryCache.Get(dirpath) - if item == nil { - return nil - } - return item.Value().(*Entry) -} - -func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { - - if f.directoryCache == nil { - return - } - - minutes := 60 - if level < 10 { - minutes -= level * 6 - } - - f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute) -} - func (f *Filer) Shutdown() { f.LocalMetaLogBuffer.Shutdown() f.Store.Shutdown() diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go index 35099a472..926569b30 100644 --- a/weed/filer2/filer_delete_entry.go +++ b/weed/filer2/filer_delete_entry.go @@ -65,6 +65,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry } if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop + glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) } @@ -73,7 +74,6 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry var dirChunks []*filer_pb.FileChunk if sub.IsDirectory() { dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false) - f.cacheDelDirectory(string(sub.FullPath)) chunks = append(chunks, dirChunks...) } else { f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster) @@ -107,9 +107,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil { return fmt.Errorf("filer store delete: %v", storeDeletionErr) } - if entry.IsDirectory() { - f.cacheDelDirectory(string(entry.FullPath)) - } else { + if !entry.IsDirectory() { f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster) } diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index a6b229771..2ff9dac63 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -1,6 +1,7 @@ package filer2 import ( + "strings" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -50,15 +51,14 @@ func (f *Filer) loopProcessingDeletion() { fileIds = fileIds[:0] } deletionCount = len(toDeleteFileIds) - deleteResults, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc) if err != nil { - glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + if !strings.Contains(err.Error(), "already deleted") { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } } else { glog.V(1).Infof("deleting fileIds len=%d", deletionCount) } - if len(deleteResults) != deletionCount { - glog.V(0).Infof("delete %d fileIds actual %d", deletionCount, len(deleteResults)) - } } }) diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 77df07a9b..81c761f56 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -17,7 +17,6 @@ func TestCreateAndFind(t *testing.T) { store := &LevelDBStore{} store.initialize(dir) filer.SetStore(store) - filer.DisableDirectoryCache() fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") @@ -72,7 +71,6 @@ func TestEmptyRoot(t *testing.T) { store := &LevelDBStore{} store.initialize(dir) filer.SetStore(store) - filer.DisableDirectoryCache() ctx := context.Background() diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index b211d86e4..27c1c954b 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -17,7 +17,6 @@ func TestCreateAndFind(t *testing.T) { store := &LevelDB2Store{} store.initialize(dir, 2) filer.SetStore(store) - filer.DisableDirectoryCache() fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg") @@ -72,7 +71,6 @@ func TestEmptyRoot(t *testing.T) { store := &LevelDB2Store{} store.initialize(dir, 2) filer.SetStore(store) - filer.DisableDirectoryCache() ctx := context.Background() diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index 568d94267..0bf528a42 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -15,12 +15,11 @@ import ( type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews []*ChunkView - buffer []byte - bufferOffset int64 lookupFileId func(fileId string) (targetUrl string, err error) readerLock sync.Mutex + fileSize int64 - chunkCache *chunk_cache.ChunkCache + chunkCache chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) @@ -54,13 +53,13 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { } } -func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt { +func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, lookupFileId: LookupFn(filerClient), - bufferOffset: -1, chunkCache: chunkCache, + fileSize: fileSize, } } @@ -69,75 +68,78 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerLock.Lock() defer c.readerLock.Unlock() - for n < len(p) && err == nil { - readCount, readErr := c.doReadAt(p[n:], offset+int64(n)) - n += readCount - err = readErr - if readCount == 0 { - return n, io.EOF - } - } - return + glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) + return c.doReadAt(p[n:], offset+int64(n)) } func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { - var found bool - for _, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - found = true - if c.bufferOffset != chunk.LogicOffset { - c.buffer, err = c.fetchChunkData(chunk) - if err != nil { - glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - } - c.bufferOffset = chunk.LogicOffset - } + var buffer []byte + startOffset, remaining := offset, int64(len(p)) + for i, chunk := range c.chunkViews { + if remaining <= 0 { break } + if startOffset < chunk.LogicOffset { + gap := int(chunk.LogicOffset - startOffset) + glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) + n += int(min(int64(gap), remaining)) + startOffset, remaining = chunk.LogicOffset, remaining-int64(gap) + if remaining <= 0 { + break + } + } + // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) + chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining) + if chunkStart >= chunkStop { + continue + } + glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) + buffer, err = c.readFromWholeChunkData(chunk) + if err != nil { + glog.Errorf("fetching chunk %+v: %v\n", chunk, err) + return + } + bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset + copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart]) + n += copied + startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) } - if !found { - return 0, io.EOF - } - if err == nil { - n = copy(p, c.buffer[offset-c.bufferOffset:]) + glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) + + if err == nil && remaining > 0 && c.fileSize > startOffset { + delta := int(min(remaining, c.fileSize - startOffset)) + glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize) + n += delta } - // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer))) + if err == nil && offset+int64(len(p)) > c.fileSize { + err = io.EOF + } + // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err) return } -func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err error) { +func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { - glog.V(4).Infof("fetchChunkData %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - hasDataInCache := false - chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) if chunkData != nil { - glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) - hasDataInCache = true + glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) } else { + glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId) chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) if err != nil { - return nil, err + return } - } - - if int64(len(chunkData)) < chunkView.Offset+int64(chunkView.Size) { - glog.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData)) - return nil, fmt.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData)) - } - - data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)] - - if !hasDataInCache { c.chunkCache.SetChunk(chunkView.FileId, chunkData) } - return data, nil + return } func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { diff --git a/weed/filer2/reader_at_test.go b/weed/filer2/reader_at_test.go new file mode 100644 index 000000000..7377c5dbc --- /dev/null +++ b/weed/filer2/reader_at_test.go @@ -0,0 +1,156 @@ +package filer2 + +import ( + "fmt" + "io" + "math" + "strconv" + "sync" + "testing" +) + +type mockChunkCache struct { +} + +func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { + x, _ := strconv.Atoi(fileId) + data = make([]byte, minSize) + for i := 0; i < int(minSize); i++ { + data[i] = byte(x) + } + return data +} +func (m *mockChunkCache) SetChunk(fileId string, data []byte) { +} + +func TestReaderAt(t *testing.T) { + + visibles := []VisibleInterval{ + { + start: 1, + stop: 2, + fileId: "1", + chunkSize: 9, + }, + { + start: 3, + stop: 4, + fileId: "3", + chunkSize: 1, + }, + { + start: 5, + stop: 6, + fileId: "5", + chunkSize: 2, + }, + { + start: 7, + stop: 9, + fileId: "7", + chunkSize: 2, + }, + { + start: 9, + stop: 10, + fileId: "9", + chunkSize: 2, + }, + } + + readerAt := &ChunkReadAt{ + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 10, + chunkCache: &mockChunkCache{}, + } + + testReadAt(t, readerAt, 0, 10, 10, nil) + testReadAt(t, readerAt, 0, 12, 10, io.EOF) + testReadAt(t, readerAt, 2, 8, 8, nil) + testReadAt(t, readerAt, 3, 6, 6, nil) + +} + +func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) { + data := make([]byte, size) + n, err := readerAt.ReadAt(data, offset) + + for _, d := range data { + fmt.Printf("%x", d) + } + fmt.Println() + + if expected != n { + t.Errorf("unexpected read size: %d, expect: %d", n, expected) + } + if err != expectedErr { + t.Errorf("unexpected read error: %v, expect: %v", err, expectedErr) + } + +} + +func TestReaderAt0(t *testing.T) { + + visibles := []VisibleInterval{ + { + start: 2, + stop: 5, + fileId: "1", + chunkSize: 9, + }, + { + start: 7, + stop: 9, + fileId: "2", + chunkSize: 9, + }, + } + + readerAt := &ChunkReadAt{ + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 10, + chunkCache: &mockChunkCache{}, + } + + testReadAt(t, readerAt, 0, 10, 10, nil) + testReadAt(t, readerAt, 3, 16, 7, io.EOF) + testReadAt(t, readerAt, 3, 5, 5, nil) + + testReadAt(t, readerAt, 11, 5, 0, io.EOF) + testReadAt(t, readerAt, 10, 5, 0, io.EOF) + +} + +func TestReaderAt1(t *testing.T) { + + visibles := []VisibleInterval{ + { + start: 2, + stop: 5, + fileId: "1", + chunkSize: 9, + }, + } + + readerAt := &ChunkReadAt{ + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 20, + chunkCache: &mockChunkCache{}, + } + + testReadAt(t, readerAt, 0, 20, 20, nil) + testReadAt(t, readerAt, 1, 7, 7, nil) + testReadAt(t, readerAt, 0, 1, 1, nil) + testReadAt(t, readerAt, 18, 4, 2, io.EOF) + testReadAt(t, readerAt, 12, 4, 4, nil) + testReadAt(t, readerAt, 4, 20, 16, io.EOF) + testReadAt(t, readerAt, 4, 10, 10, nil) + testReadAt(t, readerAt, 1, 10, 10, nil) + +} diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index e9707d3ae..fee9d45da 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -32,7 +32,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f for _, chunkView := range chunkViews { urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { w.Write(data) }) if err != nil { @@ -63,7 +63,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err } - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if err != nil { @@ -175,7 +175,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if err != nil { |
