aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-08-20 19:18:23 +0800
committerGitHub <noreply@github.com>2020-08-20 19:18:23 +0800
commitb0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (patch)
treedcf5b0dfb71089126da5ec3a3fb8eb763a37c739 /weed/filer2
parent6a93e26fc32ce35901c96371628fd0916b639026 (diff)
parentf48567c5c62bf8c8cebf568eeb919f25a4fc4289 (diff)
downloadseaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.tar.xz
seaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.zip
Merge pull request #12 from chrislusf/master
sync
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/entry.go10
-rw-r--r--weed/filer2/entry_codec.go2
-rw-r--r--weed/filer2/filechunk_manifest.go2
-rw-r--r--weed/filer2/filechunks.go41
-rw-r--r--weed/filer2/filechunks_test.go135
-rw-r--r--weed/filer2/filer.go63
-rw-r--r--weed/filer2/filer_delete_entry.go6
-rw-r--r--weed/filer2/filer_deletion.go10
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go2
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go2
-rw-r--r--weed/filer2/reader_at.go100
-rw-r--r--weed/filer2/reader_at_test.go156
-rw-r--r--weed/filer2/stream.go6
13 files changed, 368 insertions, 167 deletions
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index 00b9b132d..fedfde40d 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -22,6 +22,7 @@ type Attr struct {
GroupNames []string
SymlinkTarget string
Md5 []byte
+ FileSize uint64
}
func (attr Attr) IsDirectory() bool {
@@ -39,7 +40,7 @@ type Entry struct {
}
func (entry *Entry) Size() uint64 {
- return TotalSize(entry.Chunks)
+ return maxUint64(TotalSize(entry.Chunks), entry.FileSize)
}
func (entry *Entry) Timestamp() time.Time {
@@ -81,3 +82,10 @@ func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
Chunks: entry.Chunks,
}
}
+
+func maxUint64(x, y uint64) uint64 {
+ if x > y {
+ return x
+ }
+ return y
+}
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index 47c911011..4d615194f 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -53,6 +53,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
Md5: entry.Attr.Md5,
+ FileSize: entry.Attr.FileSize,
}
}
@@ -73,6 +74,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget
t.Md5 = attr.Md5
+ t.FileSize = attr.FileSize
return t
}
diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go
index 62d2c6e7f..037b0c1e8 100644
--- a/weed/filer2/filechunk_manifest.go
+++ b/weed/filer2/filechunk_manifest.go
@@ -64,7 +64,7 @@ func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKe
return nil, err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
+ err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) {
buffer.Write(data)
})
if err != nil {
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index ea7772b4a..1ab6679f9 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -20,6 +20,10 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
return
}
+func FileSize(entry *filer_pb.Entry) (size uint64) {
+ return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
+}
+
func ETag(entry *filer_pb.Entry) (etag string) {
if entry.Attributes == nil || entry.Attributes.Md5 == nil {
return ETagChunks(entry.Chunks)
@@ -100,7 +104,7 @@ type ChunkView struct {
FileId string
Offset int64
Size uint64
- LogicOffset int64
+ LogicOffset int64 // actual offset in the file, for the data specified via [offset, offset+size) in current chunk
ChunkSize uint64
CipherKey []byte
IsGzipped bool
@@ -130,17 +134,18 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
for _, chunk := range visibles {
- if chunk.start <= offset && offset < chunk.stop && offset < stop {
+ chunkStart, chunkStop := max(offset, chunk.start), min(stop, chunk.stop)
+
+ if chunkStart < chunkStop {
views = append(views, &ChunkView{
FileId: chunk.fileId,
- Offset: offset - chunk.start, // offset is the data starting location in this file id
- Size: uint64(min(chunk.stop, stop) - offset),
- LogicOffset: offset,
+ Offset: chunkStart - chunk.start + chunk.chunkOffset,
+ Size: uint64(chunkStop - chunkStart),
+ LogicOffset: chunkStart,
ChunkSize: chunk.chunkSize,
CipherKey: chunk.cipherKey,
IsGzipped: chunk.isGzipped,
})
- offset = min(chunk.stop, stop)
}
}
@@ -149,10 +154,11 @@ func ViewFromVisibleIntervals(visibles []VisibleInterval, offset int64, size int
}
func logPrintf(name string, visibles []VisibleInterval) {
+
/*
- log.Printf("%s len %d", name, len(visibles))
+ glog.V(0).Infof("%s len %d", name, len(visibles))
for _, v := range visibles {
- log.Printf("%s: => %+v", name, v)
+ glog.V(0).Infof("%s: [%d,%d)", name, v.start, v.stop)
}
*/
}
@@ -165,7 +171,7 @@ var bufPool = sync.Pool{
func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval {
- newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
+ newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed)
length := len(visibles)
if length == 0 {
@@ -177,13 +183,13 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
}
logPrintf(" before", visibles)
+ chunkStop := chunk.Offset + int64(chunk.Size)
for _, v := range visibles {
if v.start < chunk.Offset && chunk.Offset < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
+ newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped))
}
- chunkStop := chunk.Offset + int64(chunk.Size)
if v.start < chunkStop && chunkStop < v.stop {
- newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, chunk.Size, v.cipherKey, v.isGzipped))
+ newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped))
}
if chunkStop <= v.start || v.stop <= chunk.Offset {
newVisibles = append(newVisibles, v)
@@ -219,6 +225,7 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu
var newVisibles []VisibleInterval
for _, chunk := range chunks {
+ // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk)
t := visibles[:0]
visibles = newVisibles
@@ -239,17 +246,19 @@ type VisibleInterval struct {
stop int64
modifiedTime int64
fileId string
+ chunkOffset int64
chunkSize uint64
cipherKey []byte
isGzipped bool
}
-func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
+func newVisibleInterval(start, stop int64, fileId string, modifiedTime int64, chunkOffset int64, chunkSize uint64, cipherKey []byte, isGzipped bool) VisibleInterval {
return VisibleInterval{
start: start,
stop: stop,
fileId: fileId,
modifiedTime: modifiedTime,
+ chunkOffset: chunkOffset, // the starting position in the chunk
chunkSize: chunkSize,
cipherKey: cipherKey,
isGzipped: isGzipped,
@@ -262,3 +271,9 @@ func min(x, y int64) int64 {
}
return y
}
+func max(x, y int64) int64 {
+ if x <= y {
+ return y
+ }
+ return x
+}
diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go
index bfee59198..70da6e16c 100644
--- a/weed/filer2/filechunks_test.go
+++ b/weed/filer2/filechunks_test.go
@@ -1,10 +1,13 @@
package filer2
import (
+ "fmt"
"log"
+ "math"
"testing"
- "fmt"
+ "github.com/stretchr/testify/assert"
+
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
@@ -91,12 +94,12 @@ func TestIntervalMerging(t *testing.T) {
// case 2: updates overwrite part of previous chunks
{
Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134},
+ {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 0, Size: 70, FileId: "b", Mtime: 134},
},
Expected: []*VisibleInterval{
- {start: 0, stop: 50, fileId: "asdf"},
- {start: 50, stop: 100, fileId: "abc"},
+ {start: 0, stop: 70, fileId: "b"},
+ {start: 70, stop: 100, fileId: "a", chunkOffset: 70},
},
},
// case 3: updates overwrite full chunks
@@ -126,14 +129,14 @@ func TestIntervalMerging(t *testing.T) {
// case 5: updates overwrite full chunks
{
Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184},
- {Offset: 70, Size: 150, FileId: "abc", Mtime: 143},
- {Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134},
+ {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "d", Mtime: 184},
+ {Offset: 70, Size: 150, FileId: "c", Mtime: 143},
+ {Offset: 80, Size: 100, FileId: "b", Mtime: 134},
},
Expected: []*VisibleInterval{
- {start: 0, stop: 200, fileId: "asdf"},
- {start: 200, stop: 220, fileId: "abc"},
+ {start: 0, stop: 200, fileId: "d"},
+ {start: 200, stop: 220, fileId: "c", chunkOffset: 130},
},
},
// case 6: same updates
@@ -204,6 +207,10 @@ func TestIntervalMerging(t *testing.T) {
t.Fatalf("failed on test case %d, interval %d, chunkId %s, expect %s",
i, x, interval.fileId, testcase.Expected[x].fileId)
}
+ if interval.chunkOffset != testcase.Expected[x].chunkOffset {
+ t.Fatalf("failed on test case %d, interval %d, chunkOffset %d, expect %d",
+ i, x, interval.chunkOffset, testcase.Expected[x].chunkOffset)
+ }
}
if len(intervals) != len(testcase.Expected) {
t.Fatalf("failed to compact test case %d, len %d expected %d", i, len(intervals), len(testcase.Expected))
@@ -251,14 +258,14 @@ func TestChunksReading(t *testing.T) {
// case 2: updates overwrite part of previous chunks
{
Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 50, FileId: "asdf", Mtime: 134},
+ {Offset: 3, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 10, Size: 50, FileId: "b", Mtime: 134},
},
- Offset: 25,
- Size: 50,
+ Offset: 30,
+ Size: 40,
Expected: []*ChunkView{
- {Offset: 25, Size: 25, FileId: "asdf", LogicOffset: 25},
- {Offset: 0, Size: 25, FileId: "abc", LogicOffset: 50},
+ {Offset: 20, Size: 30, FileId: "b", LogicOffset: 30},
+ {Offset: 57, Size: 10, FileId: "a", LogicOffset: 60},
},
},
// case 3: updates overwrite full chunks
@@ -286,22 +293,22 @@ func TestChunksReading(t *testing.T) {
Size: 400,
Expected: []*ChunkView{
{Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
- // {Offset: 0, Size: 150, FileId: "xxxx"}, // missing intervals should not happen
+ {Offset: 0, Size: 150, FileId: "xxxx", LogicOffset: 250},
},
},
// case 5: updates overwrite full chunks
{
Chunks: []*filer_pb.FileChunk{
- {Offset: 0, Size: 100, FileId: "abc", Mtime: 123},
- {Offset: 0, Size: 200, FileId: "asdf", Mtime: 184},
- {Offset: 70, Size: 150, FileId: "abc", Mtime: 143},
+ {Offset: 0, Size: 100, FileId: "a", Mtime: 123},
+ {Offset: 0, Size: 200, FileId: "c", Mtime: 184},
+ {Offset: 70, Size: 150, FileId: "b", Mtime: 143},
{Offset: 80, Size: 100, FileId: "xxxx", Mtime: 134},
},
Offset: 0,
Size: 220,
Expected: []*ChunkView{
- {Offset: 0, Size: 200, FileId: "asdf", LogicOffset: 0},
- {Offset: 0, Size: 20, FileId: "abc", LogicOffset: 200},
+ {Offset: 0, Size: 200, FileId: "c", LogicOffset: 0},
+ {Offset: 130, Size: 20, FileId: "b", LogicOffset: 200},
},
},
// case 6: same updates
@@ -370,18 +377,21 @@ func TestChunksReading(t *testing.T) {
}
for i, testcase := range testcases {
+ if i != 2 {
+ // continue
+ }
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
for x, chunk := range chunks {
log.Printf("read case %d, chunk %d, offset=%d, size=%d, fileId=%s",
i, x, chunk.Offset, chunk.Size, chunk.FileId)
if chunk.Offset != testcase.Expected[x].Offset {
- t.Fatalf("failed on read case %d, chunk %d, Offset %d, expect %d",
- i, x, chunk.Offset, testcase.Expected[x].Offset)
+ t.Fatalf("failed on read case %d, chunk %s, Offset %d, expect %d",
+ i, chunk.FileId, chunk.Offset, testcase.Expected[x].Offset)
}
if chunk.Size != testcase.Expected[x].Size {
- t.Fatalf("failed on read case %d, chunk %d, Size %d, expect %d",
- i, x, chunk.Size, testcase.Expected[x].Size)
+ t.Fatalf("failed on read case %d, chunk %s, Size %d, expect %d",
+ i, chunk.FileId, chunk.Size, testcase.Expected[x].Size)
}
if chunk.FileId != testcase.Expected[x].FileId {
t.Fatalf("failed on read case %d, chunk %d, FileId %s, expect %s",
@@ -418,3 +428,74 @@ func BenchmarkCompactFileChunks(b *testing.B) {
CompactFileChunks(nil, chunks)
}
}
+
+func TestViewFromVisibleIntervals(t *testing.T) {
+ visibles := []VisibleInterval{
+ {
+ start: 0,
+ stop: 25,
+ fileId: "fid1",
+ },
+ {
+ start: 4096,
+ stop: 8192,
+ fileId: "fid2",
+ },
+ {
+ start: 16384,
+ stop: 18551,
+ fileId: "fid3",
+ },
+ }
+
+ views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
+
+ if len(views) != len(visibles) {
+ assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ }
+
+}
+
+func TestViewFromVisibleIntervals2(t *testing.T) {
+ visibles := []VisibleInterval{
+ {
+ start: 344064,
+ stop: 348160,
+ fileId: "fid1",
+ },
+ {
+ start: 348160,
+ stop: 356352,
+ fileId: "fid2",
+ },
+ }
+
+ views := ViewFromVisibleIntervals(visibles, 0, math.MaxInt32)
+
+ if len(views) != len(visibles) {
+ assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ }
+
+}
+
+func TestViewFromVisibleIntervals3(t *testing.T) {
+ visibles := []VisibleInterval{
+ {
+ start: 1000,
+ stop: 2000,
+ fileId: "fid1",
+ },
+ {
+ start: 3000,
+ stop: 4000,
+ fileId: "fid2",
+ },
+ }
+
+ views := ViewFromVisibleIntervals(visibles, 1700, 1500)
+
+ if len(views) != len(visibles) {
+ assert.Equal(t, len(visibles), len(views), "ViewFromVisibleIntervals error")
+ }
+
+}
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index dd4c38857..d3dfa5a6f 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -9,8 +9,6 @@ import (
"google.golang.org/grpc"
- "github.com/karlseguin/ccache"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -27,7 +25,6 @@ var (
type Filer struct {
Store *FilerStoreWrapper
- directoryCache *ccache.Cache
MasterClient *wdclient.MasterClient
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
@@ -44,7 +41,6 @@ type Filer struct {
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
f := &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
@@ -77,10 +73,6 @@ func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
-func (f *Filer) DisableDirectoryCache() {
- f.directoryCache = nil
-}
-
func (fs *Filer) GetMaster() string {
return fs.MasterClient.GetMaster()
}
@@ -117,16 +109,9 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
dirPath := "/" + util.Join(dirParts[:i]...)
// fmt.Printf("%d directory: %+v\n", i, dirPath)
- // first check local cache
- dirEntry := f.cacheGetDirectory(dirPath)
-
- // not found, check the store directly
- if dirEntry == nil {
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ = f.FindEntry(ctx, util.FullPath(dirPath))
- } else {
- // glog.V(4).Infof("found cached directory: %s", dirPath)
- }
+ // check the store directly
+ glog.V(4).Infof("find uncached directory: %s", dirPath)
+ dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
// no such existing directory
if dirEntry == nil {
@@ -166,9 +151,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return fmt.Errorf("%s is a file", dirPath)
}
- // cache the directory entry
- f.cacheSetDirectory(dirPath, dirEntry, i)
-
// remember the direct parent directory entry
if i == len(dirParts)-1 {
lastDirectoryEntry = dirEntry
@@ -295,45 +277,6 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
return
}
-func (f *Filer) cacheDelDirectory(dirpath string) {
-
- if dirpath == "/" {
- return
- }
-
- if f.directoryCache == nil {
- return
- }
- f.directoryCache.Delete(dirpath)
- return
-}
-
-func (f *Filer) cacheGetDirectory(dirpath string) *Entry {
-
- if f.directoryCache == nil {
- return nil
- }
- item := f.directoryCache.Get(dirpath)
- if item == nil {
- return nil
- }
- return item.Value().(*Entry)
-}
-
-func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
-
- if f.directoryCache == nil {
- return
- }
-
- minutes := 60
- if level < 10 {
- minutes -= level * 6
- }
-
- f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
-}
-
func (f *Filer) Shutdown() {
f.LocalMetaLogBuffer.Shutdown()
f.Store.Shutdown()
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
index 35099a472..926569b30 100644
--- a/weed/filer2/filer_delete_entry.go
+++ b/weed/filer2/filer_delete_entry.go
@@ -65,6 +65,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
}
if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop
+ glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
}
@@ -73,7 +74,6 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
var dirChunks []*filer_pb.FileChunk
if sub.IsDirectory() {
dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
- f.cacheDelDirectory(string(sub.FullPath))
chunks = append(chunks, dirChunks...)
} else {
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster)
@@ -107,9 +107,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
- if entry.IsDirectory() {
- f.cacheDelDirectory(string(entry.FullPath))
- } else {
+ if !entry.IsDirectory() {
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster)
}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index a6b229771..2ff9dac63 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -1,6 +1,7 @@
package filer2
import (
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -50,15 +51,14 @@ func (f *Filer) loopProcessingDeletion() {
fileIds = fileIds[:0]
}
deletionCount = len(toDeleteFileIds)
- deleteResults, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
+ _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
if err != nil {
- glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ if !strings.Contains(err.Error(), "already deleted") {
+ glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ }
} else {
glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
}
- if len(deleteResults) != deletionCount {
- glog.V(0).Infof("delete %d fileIds actual %d", deletionCount, len(deleteResults))
- }
}
})
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 77df07a9b..81c761f56 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -17,7 +17,6 @@ func TestCreateAndFind(t *testing.T) {
store := &LevelDBStore{}
store.initialize(dir)
filer.SetStore(store)
- filer.DisableDirectoryCache()
fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
@@ -72,7 +71,6 @@ func TestEmptyRoot(t *testing.T) {
store := &LevelDBStore{}
store.initialize(dir)
filer.SetStore(store)
- filer.DisableDirectoryCache()
ctx := context.Background()
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index b211d86e4..27c1c954b 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -17,7 +17,6 @@ func TestCreateAndFind(t *testing.T) {
store := &LevelDB2Store{}
store.initialize(dir, 2)
filer.SetStore(store)
- filer.DisableDirectoryCache()
fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
@@ -72,7 +71,6 @@ func TestEmptyRoot(t *testing.T) {
store := &LevelDB2Store{}
store.initialize(dir, 2)
filer.SetStore(store)
- filer.DisableDirectoryCache()
ctx := context.Background()
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
index 568d94267..0bf528a42 100644
--- a/weed/filer2/reader_at.go
+++ b/weed/filer2/reader_at.go
@@ -15,12 +15,11 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
- buffer []byte
- bufferOffset int64
lookupFileId func(fileId string) (targetUrl string, err error)
readerLock sync.Mutex
+ fileSize int64
- chunkCache *chunk_cache.ChunkCache
+ chunkCache chunk_cache.ChunkCache
}
// var _ = io.ReaderAt(&ChunkReadAt{})
@@ -54,13 +53,13 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
}
}
-func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
+func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
lookupFileId: LookupFn(filerClient),
- bufferOffset: -1,
chunkCache: chunkCache,
+ fileSize: fileSize,
}
}
@@ -69,75 +68,78 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
- for n < len(p) && err == nil {
- readCount, readErr := c.doReadAt(p[n:], offset+int64(n))
- n += readCount
- err = readErr
- if readCount == 0 {
- return n, io.EOF
- }
- }
- return
+ glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
+ return c.doReadAt(p[n:], offset+int64(n))
}
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
- var found bool
- for _, chunk := range c.chunkViews {
- if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
- found = true
- if c.bufferOffset != chunk.LogicOffset {
- c.buffer, err = c.fetchChunkData(chunk)
- if err != nil {
- glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- }
- c.bufferOffset = chunk.LogicOffset
- }
+ var buffer []byte
+ startOffset, remaining := offset, int64(len(p))
+ for i, chunk := range c.chunkViews {
+ if remaining <= 0 {
break
}
+ if startOffset < chunk.LogicOffset {
+ gap := int(chunk.LogicOffset - startOffset)
+ glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
+ n += int(min(int64(gap), remaining))
+ startOffset, remaining = chunk.LogicOffset, remaining-int64(gap)
+ if remaining <= 0 {
+ break
+ }
+ }
+ // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
+ chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
+ if chunkStart >= chunkStop {
+ continue
+ }
+ glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
+ buffer, err = c.readFromWholeChunkData(chunk)
+ if err != nil {
+ glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
+ return
+ }
+ bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
+ copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer[bufferOffset:bufferOffset+chunkStop-chunkStart])
+ n += copied
+ startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
}
- if !found {
- return 0, io.EOF
- }
- if err == nil {
- n = copy(p, c.buffer[offset-c.bufferOffset:])
+ glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
+
+ if err == nil && remaining > 0 && c.fileSize > startOffset {
+ delta := int(min(remaining, c.fileSize - startOffset))
+ glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize)
+ n += delta
}
- // fmt.Printf("> doReadAt [%d,%d), buffer:[%d,%d)\n", offset, offset+int64(n), c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
+ if err == nil && offset+int64(len(p)) > c.fileSize {
+ err = io.EOF
+ }
+ // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
return
}
-func (c *ChunkReadAt) fetchChunkData(chunkView *ChunkView) (data []byte, err error) {
+func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) {
- glog.V(4).Infof("fetchChunkData %s [%d,%d)\n", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+ glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
- hasDataInCache := false
- chunkData := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
if chunkData != nil {
- glog.V(3).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
- hasDataInCache = true
+ glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData)))
} else {
+ glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId)
chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
if err != nil {
- return nil, err
+ return
}
- }
-
- if int64(len(chunkData)) < chunkView.Offset+int64(chunkView.Size) {
- glog.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData))
- return nil, fmt.Errorf("unexpected larger cached:%v chunk %s [%d,%d) than %d", hasDataInCache, chunkView.FileId, chunkView.Offset, chunkView.Offset+int64(chunkView.Size), len(chunkData))
- }
-
- data = chunkData[chunkView.Offset : chunkView.Offset+int64(chunkView.Size)]
-
- if !hasDataInCache {
c.chunkCache.SetChunk(chunkView.FileId, chunkData)
}
- return data, nil
+ return
}
func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
diff --git a/weed/filer2/reader_at_test.go b/weed/filer2/reader_at_test.go
new file mode 100644
index 000000000..7377c5dbc
--- /dev/null
+++ b/weed/filer2/reader_at_test.go
@@ -0,0 +1,156 @@
+package filer2
+
+import (
+ "fmt"
+ "io"
+ "math"
+ "strconv"
+ "sync"
+ "testing"
+)
+
+type mockChunkCache struct {
+}
+
+func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
+ x, _ := strconv.Atoi(fileId)
+ data = make([]byte, minSize)
+ for i := 0; i < int(minSize); i++ {
+ data[i] = byte(x)
+ }
+ return data
+}
+func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
+}
+
+func TestReaderAt(t *testing.T) {
+
+ visibles := []VisibleInterval{
+ {
+ start: 1,
+ stop: 2,
+ fileId: "1",
+ chunkSize: 9,
+ },
+ {
+ start: 3,
+ stop: 4,
+ fileId: "3",
+ chunkSize: 1,
+ },
+ {
+ start: 5,
+ stop: 6,
+ fileId: "5",
+ chunkSize: 2,
+ },
+ {
+ start: 7,
+ stop: 9,
+ fileId: "7",
+ chunkSize: 2,
+ },
+ {
+ start: 9,
+ stop: 10,
+ fileId: "9",
+ chunkSize: 2,
+ },
+ }
+
+ readerAt := &ChunkReadAt{
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ chunkCache: &mockChunkCache{},
+ }
+
+ testReadAt(t, readerAt, 0, 10, 10, nil)
+ testReadAt(t, readerAt, 0, 12, 10, io.EOF)
+ testReadAt(t, readerAt, 2, 8, 8, nil)
+ testReadAt(t, readerAt, 3, 6, 6, nil)
+
+}
+
+func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) {
+ data := make([]byte, size)
+ n, err := readerAt.ReadAt(data, offset)
+
+ for _, d := range data {
+ fmt.Printf("%x", d)
+ }
+ fmt.Println()
+
+ if expected != n {
+ t.Errorf("unexpected read size: %d, expect: %d", n, expected)
+ }
+ if err != expectedErr {
+ t.Errorf("unexpected read error: %v, expect: %v", err, expectedErr)
+ }
+
+}
+
+func TestReaderAt0(t *testing.T) {
+
+ visibles := []VisibleInterval{
+ {
+ start: 2,
+ stop: 5,
+ fileId: "1",
+ chunkSize: 9,
+ },
+ {
+ start: 7,
+ stop: 9,
+ fileId: "2",
+ chunkSize: 9,
+ },
+ }
+
+ readerAt := &ChunkReadAt{
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ chunkCache: &mockChunkCache{},
+ }
+
+ testReadAt(t, readerAt, 0, 10, 10, nil)
+ testReadAt(t, readerAt, 3, 16, 7, io.EOF)
+ testReadAt(t, readerAt, 3, 5, 5, nil)
+
+ testReadAt(t, readerAt, 11, 5, 0, io.EOF)
+ testReadAt(t, readerAt, 10, 5, 0, io.EOF)
+
+}
+
+func TestReaderAt1(t *testing.T) {
+
+ visibles := []VisibleInterval{
+ {
+ start: 2,
+ stop: 5,
+ fileId: "1",
+ chunkSize: 9,
+ },
+ }
+
+ readerAt := &ChunkReadAt{
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 20,
+ chunkCache: &mockChunkCache{},
+ }
+
+ testReadAt(t, readerAt, 0, 20, 20, nil)
+ testReadAt(t, readerAt, 1, 7, 7, nil)
+ testReadAt(t, readerAt, 0, 1, 1, nil)
+ testReadAt(t, readerAt, 18, 4, 2, io.EOF)
+ testReadAt(t, readerAt, 12, 4, 4, nil)
+ testReadAt(t, readerAt, 4, 20, 16, io.EOF)
+ testReadAt(t, readerAt, 4, 10, 10, nil)
+ testReadAt(t, readerAt, 1, 10, 10, nil)
+
+}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index e9707d3ae..fee9d45da 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -32,7 +32,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
for _, chunkView := range chunkViews {
urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
w.Write(data)
})
if err != nil {
@@ -63,7 +63,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {
@@ -175,7 +175,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if err != nil {