diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-10-21 02:16:21 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-10-21 02:16:21 -0700 |
| commit | 3bf0116de1525517c82854de15d8dc3a0b59817b (patch) | |
| tree | a78d58777f8dcfdb88f15c1567fe70b84cc09afa /weed/filesys/dirty_page.go | |
| parent | c31b2542489ea4cddffbf1efedbdb867fb6cdb2f (diff) | |
| download | seaweedfs-3bf0116de1525517c82854de15d8dc3a0b59817b.tar.xz seaweedfs-3bf0116de1525517c82854de15d8dc3a0b59817b.zip | |
mount: less channel waiting
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index a200050c4..9080b2aef 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,11 +2,17 @@ package filesys import ( "bytes" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" + "runtime" "sync" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var ( + concurrentWriterLimit = runtime.NumCPU() ) type ContinuousDirtyPages struct { @@ -15,17 +21,26 @@ type ContinuousDirtyPages struct { writeWaitGroup sync.WaitGroup chunkSaveErrChan chan error chunkSaveErrChanClosed bool + lastErr error lock sync.Mutex collection string replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { - return &ContinuousDirtyPages{ + dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, 8), + chunkSaveErrChan: make(chan error, concurrentWriterLimit), } + go func() { + for t := range dirtyPages.chunkSaveErrChan { + if t != nil { + dirtyPages.lastErr = t + } + } + }() + return dirtyPages } func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { @@ -105,7 +120,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.chunkSaveErrChan <- nil + glog.V(0).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) }() } |
