aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-22 05:40:10 -0800
committerchrislu <chris.lu@gmail.com>2022-01-22 05:40:10 -0800
commit8aa6bf0bb9f58c720fb4d60ffa4d4563ec349ff6 (patch)
treeb56dc014fdb7ba6cf842a400c934972c1e1d34da /weed/filesys
parentd97bd54e63cb0d17305355f1eca6a252ca9905fe (diff)
downloadseaweedfs-8aa6bf0bb9f58c720fb4d60ffa4d4563ec349ff6.tar.xz
seaweedfs-8aa6bf0bb9f58c720fb4d60ffa4d4563ec349ff6.zip
refactoring
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/page_writer/page_chunk.go35
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go14
2 files changed, 28 insertions, 21 deletions
diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go
index ffb85d3a3..1c892ddb6 100644
--- a/weed/filesys/page_writer/page_chunk.go
+++ b/weed/filesys/page_writer/page_chunk.go
@@ -11,9 +11,9 @@ type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupF
type PageChunk interface {
FreeResource()
WriteDataAt(src []byte, offset int64) (n int)
- ReadDataAt(p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64)
- IsComplete(chunkSize int64) bool
- SaveContent(saveFn SaveToStorageFunc, logicChunkIndex LogicChunkIndex, chunkSize int64)
+ ReadDataAt(p []byte, off int64) (maxStop int64)
+ IsComplete() bool
+ SaveContent(saveFn SaveToStorageFunc)
}
var (
@@ -21,8 +21,19 @@ var (
)
type MemChunk struct {
- buf []byte
- usage *ChunkWrittenIntervalList
+ buf []byte
+ usage *ChunkWrittenIntervalList
+ chunkSize int64
+ logicChunkIndex LogicChunkIndex
+}
+
+func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
+ return &MemChunk{
+ logicChunkIndex: logicChunkIndex,
+ chunkSize: chunkSize,
+ buf: mem.Allocate(int(chunkSize)),
+ usage: newChunkWrittenIntervalList(),
+ }
}
func (mc *MemChunk) FreeResource() {
@@ -35,10 +46,10 @@ func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
return
}
-func (mc *MemChunk) ReadDataAt(p []byte, off int64, logicChunkIndex LogicChunkIndex, chunkSize int64) (maxStop int64) {
- memChunkBaseOffset := int64(logicChunkIndex) * chunkSize
+func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
- logicStart := max(off, int64(logicChunkIndex)*chunkSize+t.StartOffset)
+ logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
if logicStart < logicStop {
copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
@@ -48,17 +59,17 @@ func (mc *MemChunk) ReadDataAt(p []byte, off int64, logicChunkIndex LogicChunkIn
return
}
-func (mc *MemChunk) IsComplete(chunkSize int64) bool {
- return mc.usage.IsComplete(chunkSize)
+func (mc *MemChunk) IsComplete() bool {
+ return mc.usage.IsComplete(mc.chunkSize)
}
-func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc, logicChunkIndex LogicChunkIndex, chunkSize int64) {
+func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
if saveFn == nil {
return
}
for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
- saveFn(reader, int64(logicChunkIndex)*chunkSize+t.StartOffset, t.Size(), func() {
+ saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
})
}
}
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
index 02a412993..c03a17d2c 100644
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ b/weed/filesys/page_writer/upload_pipeline.go
@@ -4,7 +4,6 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/mem"
"sync"
"sync/atomic"
"time"
@@ -62,10 +61,7 @@ func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
memChunk, found := cw.writableChunks[logicChunkIndex]
if !found {
- memChunk = &MemChunk{
- buf: mem.Allocate(int(cw.ChunkSize)),
- usage: newChunkWrittenIntervalList(),
- }
+ memChunk = NewMemChunk(logicChunkIndex, cw.ChunkSize)
cw.writableChunks[logicChunkIndex] = memChunk
}
n = memChunk.WriteDataAt(p, offsetRemainder)
@@ -85,7 +81,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
}
cw.sealedChunksLock.Unlock()
if found {
- maxStop = sealedChunk.chunk.ReadDataAt(p, off, logicChunkIndex, cw.ChunkSize)
+ maxStop = sealedChunk.chunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop)
sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex))
}
@@ -97,7 +93,7 @@ func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
if !found {
return
}
- writableMaxStop := writableChunk.ReadDataAt(p, off, logicChunkIndex, cw.ChunkSize)
+ writableMaxStop := writableChunk.ReadDataAt(p, off)
glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop)
maxStop = max(maxStop, writableMaxStop)
@@ -174,7 +170,7 @@ func (cw *UploadPipeline) waitForCurrentWritersToComplete() {
}
func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
- if memChunk.IsComplete(cw.ChunkSize) {
+ if memChunk.IsComplete() {
cw.moveToSealed(memChunk, logicChunkIndex)
}
}
@@ -199,7 +195,7 @@ func (cw *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
cw.uploaders.Execute(func() {
// first add to the file chunks
- sealedChunk.chunk.SaveContent(cw.saveToStorageFn, logicChunkIndex, cw.ChunkSize)
+ sealedChunk.chunk.SaveContent(cw.saveToStorageFn)
// notify waiting process
atomic.AddInt32(&cw.uploaderCount, -1)