aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filesys/dirty_pages_mem_chunk.go46
1 files changed, 14 insertions, 32 deletions
diff --git a/weed/filesys/dirty_pages_mem_chunk.go b/weed/filesys/dirty_pages_mem_chunk.go
index fc5669ec0..9313c4562 100644
--- a/weed/filesys/dirty_pages_mem_chunk.go
+++ b/weed/filesys/dirty_pages_mem_chunk.go
@@ -46,8 +46,7 @@ func (pages *MemoryChunkPages) FlushData() error {
if !pages.hasWrites {
return nil
}
- pages.saveChunkedFileToStorage()
- pages.writeWaitGroup.Wait()
+ pages.uploadPipeline.FlushAll()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
@@ -65,41 +64,24 @@ func (pages *MemoryChunkPages) GetStorageOptions() (collection, replication stri
return pages.collection, pages.replication
}
-func (pages *MemoryChunkPages) saveChunkedFileToStorage() {
-
- pages.uploadPipeline.FlushAll()
-
-}
-
func (pages *MemoryChunkPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
mtime := time.Now().UnixNano()
- pages.writeWaitGroup.Add(1)
- writer := func() {
- defer pages.writeWaitGroup.Done()
- defer cleanupFn()
-
- chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset)
- if err != nil {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err)
- pages.lastErr = err
- return
- }
- chunk.Mtime = mtime
- pages.collection, pages.replication = collection, replication
- pages.chunkAddLock.Lock()
- pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk})
- pages.fh.entryViewCache = nil
- glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size)
- pages.chunkAddLock.Unlock()
-
- }
+ defer cleanupFn()
- if pages.fh.f.wfs.concurrentWriters != nil {
- pages.fh.f.wfs.concurrentWriters.Execute(writer)
- } else {
- go writer()
+ chunk, collection, replication, err := pages.fh.f.wfs.saveDataAsChunk(pages.fh.f.fullpath())(reader, pages.fh.f.Name, offset)
+ if err != nil {
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.fh.f.fullpath(), offset, offset+size, err)
+ pages.lastErr = err
+ return
}
+ chunk.Mtime = mtime
+ pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ pages.fh.f.addChunks([]*filer_pb.FileChunk{chunk})
+ pages.fh.entryViewCache = nil
+ glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.fh.f.fullpath(), chunk.FileId, offset, offset+size)
+ pages.chunkAddLock.Unlock()
}