aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go4
-rw-r--r--weed/filesys/file.go34
-rw-r--r--weed/filesys/filehandle.go15
3 files changed, 39 insertions, 14 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index c1b78a220..11089186f 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -15,10 +15,10 @@ type ContinuousDirtyPages struct {
intervals *ContinuousIntervals
f *File
writeWaitGroup sync.WaitGroup
+ chunkAddLock sync.Mutex
chunkSaveErrChan chan error
chunkSaveErrChanClosed bool
lastErr error
- lock sync.Mutex
collection string
replication string
}
@@ -117,6 +117,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ defer pages.chunkAddLock.Unlock()
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 4662c23db..3bffa156e 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -144,7 +144,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
file.entry.Chunks = chunks
- file.entryViewCache = nil
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
file.reader = nil
file.wfs.deleteFileChunks(truncatedChunks)
}
@@ -282,15 +282,37 @@ func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, er
return entry, nil
}
+func lessThan(a, b *filer_pb.FileChunk) bool {
+ if a.Mtime == b.Mtime {
+ return a.Fid.FileKey < b.Fid.FileKey
+ }
+ return a.Mtime < b.Mtime
+}
+
func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Mtime == chunks[j].Mtime {
- return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
+ // find the earliest incoming chunk
+ newChunks := chunks
+ earliestChunk := newChunks[0]
+ for i := 1; i < len(newChunks); i++ {
+ if lessThan(earliestChunk, newChunks[i]) {
+ earliestChunk = newChunks[i]
+ }
+ }
+
+ // pick out-of-order chunks from existing chunks
+ for _, chunk := range file.entry.Chunks {
+ if lessThan(earliestChunk, chunk) {
+ chunks = append(chunks, chunk)
}
- return chunks[i].Mtime < chunks[j].Mtime
+ }
+
+ // sort incoming chunks
+ sort.Slice(chunks, func(i, j int) bool {
+ return lessThan(chunks[i], chunks[j])
})
+ // add to entry view cache
for _, chunk := range chunks {
file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk)
}
@@ -299,7 +321,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
- file.entry.Chunks = append(file.entry.Chunks, chunks...)
+ file.entry.Chunks = append(file.entry.Chunks, newChunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 54bde3494..54410a0ba 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -183,16 +183,18 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
}
if fh.f.isOpen == 0 {
+
if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
}
- fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
- }
- // stop the goroutine
- if !fh.dirtyPages.chunkSaveErrChanClosed {
- fh.dirtyPages.chunkSaveErrChanClosed = true
- close(fh.dirtyPages.chunkSaveErrChan)
+ // stop the goroutine
+ if !fh.dirtyPages.chunkSaveErrChanClosed {
+ fh.dirtyPages.chunkSaveErrChanClosed = true
+ close(fh.dirtyPages.chunkSaveErrChan)
+ }
+
+ fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}
return nil
@@ -262,7 +264,6 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
fh.f.entry.Chunks = append(chunks, manifestChunks...)
- fh.f.entryViewCache = nil
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)