aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/dirty_page.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/dirty_page.go')
-rw-r--r--weed/filesys/dirty_page.go28
1 files changed, 17 insertions, 11 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index dd0c48796..11089186f 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -9,22 +9,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- concurrentWriterLimit = runtime.NumCPU()
- concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit)
)
type ContinuousDirtyPages struct {
intervals *ContinuousIntervals
f *File
writeWaitGroup sync.WaitGroup
+ chunkAddLock sync.Mutex
chunkSaveErrChan chan error
chunkSaveErrChanClosed bool
lastErr error
- lock sync.Mutex
collection string
replication string
}
@@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
f: file,
- chunkSaveErrChan: make(chan error, concurrentWriterLimit),
+ chunkSaveErrChan: make(chan error, runtime.NumCPU()),
}
go func() {
for t := range dirtyPages.chunkSaveErrChan {
@@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
+ errChanSize := pages.f.wfs.option.ConcurrentWriters
+ if errChanSize == 0 {
+ errChanSize = runtime.NumCPU()
+ }
if pages.chunkSaveErrChanClosed {
- pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit)
+ pages.chunkSaveErrChan = make(chan error, errChanSize)
pages.chunkSaveErrChanClosed = false
}
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
- go func() {
+ writer := func() {
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
@@ -119,9 +117,17 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ defer pages.chunkAddLock.Unlock()
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
- }()
+ }
+
+ if pages.f.wfs.concurrentWriters != nil {
+ pages.f.wfs.concurrentWriters.Execute(writer)
+ } else {
+ go writer()
+ }
}
func max(x, y int64) int64 {