diff options
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index dd0c48796..11089186f 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -9,22 +9,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - concurrentWriterLimit = runtime.NumCPU() - concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit) ) type ContinuousDirtyPages struct { intervals *ContinuousIntervals f *File writeWaitGroup sync.WaitGroup + chunkAddLock sync.Mutex chunkSaveErrChan chan error chunkSaveErrChanClosed bool lastErr error - lock sync.Mutex collection string replication string } @@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, concurrentWriterLimit), + chunkSaveErrChan: make(chan error, runtime.NumCPU()), } go func() { for t := range dirtyPages.chunkSaveErrChan { @@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { + errChanSize := pages.f.wfs.option.ConcurrentWriters + if errChanSize == 0 { + errChanSize = runtime.NumCPU() + } if pages.chunkSaveErrChanClosed { - pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit) + pages.chunkSaveErrChan = make(chan error, errChanSize) pages.chunkSaveErrChanClosed = false } mtime := time.Now().UnixNano() pages.writeWaitGroup.Add(1) - go func() { + writer := func() { defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) @@ -119,9 +117,17 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } chunk.Mtime = mtime pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + defer pages.chunkAddLock.Unlock() pages.f.addChunks([]*filer_pb.FileChunk{chunk}) glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - }() + } + + if pages.f.wfs.concurrentWriters != nil { + pages.f.wfs.concurrentWriters.Execute(writer) + } else { + go writer() + } } func max(x, y int64) int64 { |
