diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-10-21 20:04:11 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-21 20:04:11 +0800 |
| commit | ab1105c52472946efab9713bf15df45e14ff4514 (patch) | |
| tree | 7af939a22f3efbf055054670ec8d2e3f11a79ad6 /weed/filesys/dirty_page.go | |
| parent | 2c40f56e5a2e4792361b6df0bb6e879726f340ab (diff) | |
| parent | 81cf8d04dfcbb84093044de4f10a8a92d9c8bd1c (diff) | |
| download | seaweedfs-ab1105c52472946efab9713bf15df45e14ff4514.tar.xz seaweedfs-ab1105c52472946efab9713bf15df45e14ff4514.zip | |
Merge pull request #31 from chrislusf/master
sync
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index a200050c4..7a3e859f5 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,11 +2,17 @@ package filesys import ( "bytes" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" + "runtime" "sync" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var ( + concurrentWriterLimit = runtime.NumCPU() ) type ContinuousDirtyPages struct { @@ -15,17 +21,26 @@ type ContinuousDirtyPages struct { writeWaitGroup sync.WaitGroup chunkSaveErrChan chan error chunkSaveErrChanClosed bool + lastErr error lock sync.Mutex collection string replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { - return &ContinuousDirtyPages{ + dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, 8), + chunkSaveErrChan: make(chan error, concurrentWriterLimit), } + go func() { + for t := range dirtyPages.chunkSaveErrChan { + if t != nil { + dirtyPages.lastErr = t + } + } + }() + return dirtyPages } func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { @@ -84,7 +99,7 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { if pages.chunkSaveErrChanClosed { - pages.chunkSaveErrChan = make(chan error, 8) + pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit) pages.chunkSaveErrChanClosed = false } @@ -105,7 +120,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.chunkSaveErrChan <- nil + glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) }() } |
