aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-13 22:50:44 -0800
committerchrislu <chris.lu@gmail.com>2022-02-13 22:50:44 -0800
commit2b955c171345334a4034888c69547662150ceb91 (patch)
treed2e037582cfc99303b305b81b56b095ada4c9f87
parentf3c1e0052127a165955013cf7ba6483dcbda3391 (diff)
downloadseaweedfs-2b955c171345334a4034888c69547662150ceb91.tar.xz
seaweedfs-2b955c171345334a4034888c69547662150ceb91.zip
support read
-rw-r--r--weed/mount/dirty_pages_chunked.go99
-rw-r--r--weed/mount/filehandle.go94
-rw-r--r--weed/mount/filehandle_map.go26
-rw-r--r--weed/mount/filehandle_read.go114
-rw-r--r--weed/mount/page_writer.go95
-rw-r--r--weed/mount/page_writer/chunk_interval_list.go115
-rw-r--r--weed/mount/page_writer/chunk_interval_list_test.go49
-rw-r--r--weed/mount/page_writer/dirty_pages.go30
-rw-r--r--weed/mount/page_writer/page_chunk.go16
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go69
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go121
-rw-r--r--weed/mount/page_writer/upload_pipeline.go182
-rw-r--r--weed/mount/page_writer/upload_pipeline_lock.go63
-rw-r--r--weed/mount/page_writer/upload_pipeline_test.go47
-rw-r--r--weed/mount/weedfs.go42
-rw-r--r--weed/mount/weedfs_file_read.go29
-rw-r--r--weed/mount/weedfs_filehandle.go6
-rw-r--r--weed/mount/weedfs_write.go84
18 files changed, 1257 insertions, 24 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
new file mode 100644
index 000000000..5ffcff83a
--- /dev/null
+++ b/weed/mount/dirty_pages_chunked.go
@@ -0,0 +1,99 @@
+package mount
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
+ "sync"
+ "time"
+)
+
+type ChunkedDirtyPages struct {
+ fh *FileHandle
+ writeWaitGroup sync.WaitGroup
+ lastErr error
+ collection string
+ replication string
+ uploadPipeline *page_writer.UploadPipeline
+ hasWrites bool
+}
+
+var (
+ _ = page_writer.DirtyPages(&ChunkedDirtyPages{})
+)
+
+func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
+
+ dirtyPages := &ChunkedDirtyPages{
+ fh: fh,
+ }
+
+ dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize, dirtyPages.saveChunkedFileIntevalToStorage, fh.wfs.option.ConcurrentWriters)
+
+ return dirtyPages
+}
+
+func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) {
+ pages.hasWrites = true
+
+ glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh, offset, offset+int64(len(data)))
+ pages.uploadPipeline.SaveDataAt(data, offset)
+
+ return
+}
+
+func (pages *ChunkedDirtyPages) FlushData() error {
+ if !pages.hasWrites {
+ return nil
+ }
+ pages.uploadPipeline.FlushAll()
+ if pages.lastErr != nil {
+ return fmt.Errorf("flush data: %v", pages.lastErr)
+ }
+ return nil
+}
+
+func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ if !pages.hasWrites {
+ return
+ }
+ return pages.uploadPipeline.MaybeReadDataAt(data, startOffset)
+}
+
+func (pages *ChunkedDirtyPages) GetStorageOptions() (collection, replication string) {
+ return pages.collection, pages.replication
+}
+
+func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
+
+ mtime := time.Now().UnixNano()
+ defer cleanupFn()
+
+ fileFullPath := pages.fh.FullPath()
+ fileName := fileFullPath.Name()
+ chunk, collection, replication, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset)
+ if err != nil {
+ glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
+ pages.lastErr = err
+ return
+ }
+ chunk.Mtime = mtime
+ pages.collection, pages.replication = collection, replication
+ pages.fh.addChunks([]*filer_pb.FileChunk{chunk})
+ pages.fh.entryViewCache = nil
+ glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
+
+}
+
+func (pages ChunkedDirtyPages) Destroy() {
+ pages.uploadPipeline.Shutdown()
+}
+
+func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) {
+ pages.uploadPipeline.LockForRead(startOffset, stopOffset)
+}
+func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
+ pages.uploadPipeline.UnlockForRead(startOffset, stopOffset)
+}
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
new file mode 100644
index 000000000..0d5481b30
--- /dev/null
+++ b/weed/mount/filehandle.go
@@ -0,0 +1,94 @@
+package mount
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "sort"
+ "sync"
+)
+
+type FileHandleId uint64
+
+type FileHandle struct {
+ fh FileHandleId
+ counter int64
+ entry *filer_pb.Entry
+ chunkAddLock sync.Mutex
+ inode uint64
+ wfs *WFS
+
+ // cache file has been written to
+ dirtyPages *PageWriter
+ entryViewCache []filer.VisibleInterval
+ reader io.ReaderAt
+ contentType string
+ handle uint64
+ sync.Mutex
+
+ isDeleted bool
+}
+
+func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
+ fh := &FileHandle{
+ fh: handleId,
+ counter: 1,
+ inode: inode,
+ wfs: wfs,
+ }
+ // dirtyPages: newContinuousDirtyPages(file, writeOnly),
+ fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
+ if entry != nil {
+ entry.Attributes.FileSize = filer.FileSize(entry)
+ }
+
+ return fh
+}
+
+func (fh *FileHandle) FullPath() util.FullPath {
+ return fh.wfs.inodeToPath.GetPath(fh.inode)
+}
+
+func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
+
+ // find the earliest incoming chunk
+ newChunks := chunks
+ earliestChunk := newChunks[0]
+ for i := 1; i < len(newChunks); i++ {
+ if lessThan(earliestChunk, newChunks[i]) {
+ earliestChunk = newChunks[i]
+ }
+ }
+
+ if fh.entry == nil {
+ return
+ }
+
+ // pick out-of-order chunks from existing chunks
+ for _, chunk := range fh.entry.Chunks {
+ if lessThan(earliestChunk, chunk) {
+ chunks = append(chunks, chunk)
+ }
+ }
+
+ // sort incoming chunks
+ sort.Slice(chunks, func(i, j int) bool {
+ return lessThan(chunks[i], chunks[j])
+ })
+
+ glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
+
+ fh.chunkAddLock.Lock()
+ fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
+ fh.entryViewCache = nil
+ fh.chunkAddLock.Unlock()
+}
+
+func lessThan(a, b *filer_pb.FileChunk) bool {
+ if a.Mtime == b.Mtime {
+ return a.Fid.FileKey < b.Fid.FileKey
+ }
+ return a.Mtime < b.Mtime
+}
diff --git a/weed/mount/filehandle_map.go b/weed/mount/filehandle_map.go
index ca010dabb..50ca6bcea 100644
--- a/weed/mount/filehandle_map.go
+++ b/weed/mount/filehandle_map.go
@@ -5,20 +5,12 @@ import (
"sync"
)
-type FileHandleId uint64
-
type FileHandleToInode struct {
sync.RWMutex
nextFh FileHandleId
inode2fh map[uint64]*FileHandle
fh2inode map[FileHandleId]uint64
}
-type FileHandle struct {
- fh FileHandleId
- counter int64
- entry *filer_pb.Entry
- inode uint64
-}
func NewFileHandleToInode() *FileHandleToInode {
return &FileHandleToInode{
@@ -28,16 +20,22 @@ func NewFileHandleToInode() *FileHandleToInode {
}
}
-func (i *FileHandleToInode) GetFileHandle(inode uint64) *FileHandle {
+func (i *FileHandleToInode) GetFileHandle(fh FileHandleId) *FileHandle {
+ i.RLock()
+ defer i.RUnlock()
+ inode, found := i.fh2inode[fh]
+ if found {
+ return i.inode2fh[inode]
+ }
+ return nil
+}
+
+func (i *FileHandleToInode) AcquireFileHandle(wfs *WFS, inode uint64, entry *filer_pb.Entry) *FileHandle {
i.Lock()
defer i.Unlock()
fh, found := i.inode2fh[inode]
if !found {
- fh = &FileHandle{
- fh: i.nextFh,
- counter: 1,
- inode: inode,
- }
+ fh = newFileHandle(wfs, i.nextFh, inode, entry)
i.nextFh++
i.inode2fh[inode] = fh
i.fh2inode[fh.fh] = inode
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
new file mode 100644
index 000000000..71166169e
--- /dev/null
+++ b/weed/mount/filehandle_read.go
@@ -0,0 +1,114 @@
+package mount
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
+ "math"
+)
+
+func (fh *FileHandle) lockForRead(startOffset int64, size int) {
+ fh.dirtyPages.LockForRead(startOffset, startOffset+int64(size))
+}
+func (fh *FileHandle) unlockForRead(startOffset int64, size int) {
+ fh.dirtyPages.UnlockForRead(startOffset, startOffset+int64(size))
+}
+
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
+ maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
+ return
+}
+
+func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
+
+ fileFullPath := fh.FullPath()
+
+ entry := fh.entry
+ if entry == nil {
+ return 0, io.EOF
+ }
+
+ if entry.IsInRemoteOnly() {
+ glog.V(4).Infof("download remote entry %s", fileFullPath)
+ newEntry, err := fh.downloadRemoteEntry(entry)
+ if err != nil {
+ glog.V(1).Infof("download remote entry %s: %v", fileFullPath, err)
+ return 0, err
+ }
+ entry = newEntry
+ }
+
+ fileSize := int64(filer.FileSize(entry))
+
+ if fileSize == 0 {
+ glog.V(1).Infof("empty fh %v", fileFullPath)
+ return 0, io.EOF
+ }
+
+ if offset+int64(len(buff)) <= int64(len(entry.Content)) {
+ totalRead := copy(buff, entry.Content[offset:])
+ glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
+ return int64(totalRead), nil
+ }
+
+ var chunkResolveErr error
+ if fh.entryViewCache == nil {
+ fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
+ }
+ fh.reader = nil
+ }
+
+ reader := fh.reader
+ if reader == nil {
+ chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
+ glog.V(4).Infof("file handle read %s [%d,%d) from %d views", fileFullPath, offset, offset+int64(len(buff)), len(chunkViews))
+ for _, chunkView := range chunkViews {
+ glog.V(4).Infof(" read %s [%d,%d) from chunk %+v", fileFullPath, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.FileId)
+ }
+ reader = filer.NewChunkReaderAtFromClient(fh.wfs.LookupFn(), chunkViews, fh.wfs.chunkCache, fileSize)
+ }
+ fh.reader = reader
+
+ totalRead, err := reader.ReadAt(buff, offset)
+
+ if err != nil && err != io.EOF {
+ glog.Errorf("file handle read %s: %v", fileFullPath, err)
+ }
+
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
+
+ return int64(totalRead), err
+}
+
+func (fh *FileHandle) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
+
+ fileFullPath := fh.FullPath()
+ dir, _ := fileFullPath.DirAndName()
+
+ err := fh.wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.CacheRemoteObjectToLocalClusterRequest{
+ Directory: string(dir),
+ Name: entry.Name,
+ }
+
+ glog.V(4).Infof("download entry: %v", request)
+ resp, err := client.CacheRemoteObjectToLocalCluster(context.Background(), request)
+ if err != nil {
+ return fmt.Errorf("CacheRemoteObjectToLocalCluster file %s: %v", fileFullPath, err)
+ }
+
+ entry = resp.Entry
+
+ fh.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry))
+
+ return nil
+ })
+
+ return entry, err
+}
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
new file mode 100644
index 000000000..eaf1fc176
--- /dev/null
+++ b/weed/mount/page_writer.go
@@ -0,0 +1,95 @@
+package mount
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type PageWriter struct {
+ fh *FileHandle
+ collection string
+ replication string
+ chunkSize int64
+
+ randomWriter page_writer.DirtyPages
+}
+
+var (
+ _ = page_writer.DirtyPages(&PageWriter{})
+)
+
+func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
+ pw := &PageWriter{
+ fh: fh,
+ chunkSize: chunkSize,
+ randomWriter: newMemoryChunkPages(fh, chunkSize),
+ // randomWriter: newTempFileDirtyPages(fh.f, chunkSize),
+ }
+ return pw
+}
+
+func (pw *PageWriter) AddPage(offset int64, data []byte) {
+
+ glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh, offset, offset+int64(len(data)))
+
+ chunkIndex := offset / pw.chunkSize
+ for i := chunkIndex; len(data) > 0; i++ {
+ writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
+ pw.addToOneChunk(i, offset, data[:writeSize])
+ offset += writeSize
+ data = data[writeSize:]
+ }
+}
+
+func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
+ pw.randomWriter.AddPage(offset, data)
+}
+
+func (pw *PageWriter) FlushData() error {
+ return pw.randomWriter.FlushData()
+}
+
+func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
+ glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh, offset, offset+int64(len(data)))
+
+ chunkIndex := offset / pw.chunkSize
+ for i := chunkIndex; len(data) > 0; i++ {
+ readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
+
+ maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
+
+ offset += readSize
+ data = data[readSize:]
+ }
+
+ return
+}
+
+func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
+ return pw.randomWriter.GetStorageOptions()
+}
+
+func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) {
+ pw.randomWriter.LockForRead(startOffset, stopOffset)
+}
+
+func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) {
+ pw.randomWriter.UnlockForRead(startOffset, stopOffset)
+}
+
+func (pw *PageWriter) Destroy() {
+ pw.randomWriter.Destroy()
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go
new file mode 100644
index 000000000..e6dc5d1f5
--- /dev/null
+++ b/weed/mount/page_writer/chunk_interval_list.go
@@ -0,0 +1,115 @@
+package page_writer
+
+import "math"
+
+// ChunkWrittenInterval mark one written interval within one page chunk
+type ChunkWrittenInterval struct {
+ StartOffset int64
+ stopOffset int64
+ prev *ChunkWrittenInterval
+ next *ChunkWrittenInterval
+}
+
+func (interval *ChunkWrittenInterval) Size() int64 {
+ return interval.stopOffset - interval.StartOffset
+}
+
+func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
+ return interval.stopOffset-interval.StartOffset == chunkSize
+}
+
+// ChunkWrittenIntervalList mark written intervals within one page chunk
+type ChunkWrittenIntervalList struct {
+ head *ChunkWrittenInterval
+ tail *ChunkWrittenInterval
+}
+
+func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
+ list := &ChunkWrittenIntervalList{
+ head: &ChunkWrittenInterval{
+ StartOffset: -1,
+ stopOffset: -1,
+ },
+ tail: &ChunkWrittenInterval{
+ StartOffset: math.MaxInt64,
+ stopOffset: math.MaxInt64,
+ },
+ }
+ list.head.next = list.tail
+ list.tail.prev = list.head
+ return list
+}
+
+func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
+ interval := &ChunkWrittenInterval{
+ StartOffset: startOffset,
+ stopOffset: stopOffset,
+ }
+ list.addInterval(interval)
+}
+
+func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
+ return list.size() == 1 && list.head.next.isComplete(chunkSize)
+}
+func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
+ for t := list.head; t != nil; t = t.next {
+ writtenByteCount += t.Size()
+ }
+ return
+}
+
+func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
+
+ p := list.head
+ for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
+ }
+ q := list.tail
+ for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
+ }
+
+ if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
+ // merge p and q together
+ p.stopOffset = q.stopOffset
+ unlinkNodesBetween(p, q.next)
+ return
+ }
+ if interval.StartOffset <= p.stopOffset {
+ // merge new interval into p
+ p.stopOffset = interval.stopOffset
+ unlinkNodesBetween(p, q)
+ return
+ }
+ if q.StartOffset <= interval.stopOffset {
+ // merge new interval into q
+ q.StartOffset = interval.StartOffset
+ unlinkNodesBetween(p, q)
+ return
+ }
+
+ // add the new interval between p and q
+ unlinkNodesBetween(p, q)
+ p.next = interval
+ interval.prev = p
+ q.prev = interval
+ interval.next = q
+
+}
+
+// unlinkNodesBetween remove all nodes after start and before stop, exclusive
+func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
+ if start.next == stop {
+ return
+ }
+ start.next.prev = nil
+ start.next = stop
+ stop.prev.next = nil
+ stop.prev = start
+}
+
+func (list *ChunkWrittenIntervalList) size() int {
+ var count int
+ for t := list.head; t != nil; t = t.next {
+ count++
+ }
+ return count - 2
+}
diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go
new file mode 100644
index 000000000..b22f5eb5d
--- /dev/null
+++ b/weed/mount/page_writer/chunk_interval_list_test.go
@@ -0,0 +1,49 @@
+package page_writer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_PageChunkWrittenIntervalList(t *testing.T) {
+ list := newChunkWrittenIntervalList()
+
+ assert.Equal(t, 0, list.size(), "empty list")
+
+ list.MarkWritten(0, 5)
+ assert.Equal(t, 1, list.size(), "one interval")
+
+ list.MarkWritten(0, 5)
+ assert.Equal(t, 1, list.size(), "duplicated interval2")
+
+ list.MarkWritten(95, 100)
+ assert.Equal(t, 2, list.size(), "two intervals")
+
+ list.MarkWritten(50, 60)
+ assert.Equal(t, 3, list.size(), "three intervals")
+
+ list.MarkWritten(50, 55)
+ assert.Equal(t, 3, list.size(), "three intervals merge")
+
+ list.MarkWritten(40, 50)
+ assert.Equal(t, 3, list.size(), "three intervals grow forward")
+
+ list.MarkWritten(50, 65)
+ assert.Equal(t, 3, list.size(), "three intervals grow backward")
+
+ list.MarkWritten(70, 80)
+ assert.Equal(t, 4, list.size(), "four intervals")
+
+ list.MarkWritten(60, 70)
+ assert.Equal(t, 3, list.size(), "three intervals merged")
+
+ list.MarkWritten(59, 71)
+ assert.Equal(t, 3, list.size(), "covered three intervals")
+
+ list.MarkWritten(5, 59)
+ assert.Equal(t, 2, list.size(), "covered two intervals")
+
+ list.MarkWritten(70, 99)
+ assert.Equal(t, 1, list.size(), "covered one intervals")
+
+}
diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go
new file mode 100644
index 000000000..25b747fad
--- /dev/null
+++ b/weed/mount/page_writer/dirty_pages.go
@@ -0,0 +1,30 @@
+package page_writer
+
+type DirtyPages interface {
+ AddPage(offset int64, data []byte)
+ FlushData() error
+ ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
+ GetStorageOptions() (collection, replication string)
+ Destroy()
+ LockForRead(startOffset, stopOffset int64)
+ UnlockForRead(startOffset, stopOffset int64)
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
+func minInt(x, y int) int {
+ if x < y {
+ return x
+ }
+ return y
+}
diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go
new file mode 100644
index 000000000..4e8f31425
--- /dev/null
+++ b/weed/mount/page_writer/page_chunk.go
@@ -0,0 +1,16 @@
+package page_writer
+
+import (
+ "io"
+)
+
+type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
+
+type PageChunk interface {
+ FreeResource()
+ WriteDataAt(src []byte, offset int64) (n int)
+ ReadDataAt(p []byte, off int64) (maxStop int64)
+ IsComplete() bool
+ WrittenSize() int64
+ SaveContent(saveFn SaveToStorageFunc)
+}
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
new file mode 100644
index 000000000..dfd54c19e
--- /dev/null
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -0,0 +1,69 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+)
+
+var (
+ _ = PageChunk(&MemChunk{})
+)
+
+type MemChunk struct {
+ buf []byte
+ usage *ChunkWrittenIntervalList
+ chunkSize int64
+ logicChunkIndex LogicChunkIndex
+}
+
+func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
+ return &MemChunk{
+ logicChunkIndex: logicChunkIndex,
+ chunkSize: chunkSize,
+ buf: mem.Allocate(int(chunkSize)),
+ usage: newChunkWrittenIntervalList(),
+ }
+}
+
+func (mc *MemChunk) FreeResource() {
+ mem.Free(mc.buf)
+}
+
+func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
+ innerOffset := offset % mc.chunkSize
+ n = copy(mc.buf[innerOffset:], src)
+ mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
+ return
+}
+
+func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
+ for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
+ logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
+ logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
+ if logicStart < logicStop {
+ copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ return
+}
+
+func (mc *MemChunk) IsComplete() bool {
+ return mc.usage.IsComplete(mc.chunkSize)
+}
+
+func (mc *MemChunk) WrittenSize() int64 {
+ return mc.usage.WrittenSize()
+}
+
+func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
+ if saveFn == nil {
+ return
+ }
+ for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
+ reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
+ saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
+ })
+ }
+}
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
new file mode 100644
index 000000000..486557629
--- /dev/null
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -0,0 +1,121 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "os"
+)
+
+var (
+ _ = PageChunk(&SwapFileChunk{})
+)
+
+type ActualChunkIndex int
+
+type SwapFile struct {
+ dir string
+ file *os.File
+ logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
+ chunkSize int64
+}
+
+type SwapFileChunk struct {
+ swapfile *SwapFile
+ usage *ChunkWrittenIntervalList
+ logicChunkIndex LogicChunkIndex
+ actualChunkIndex ActualChunkIndex
+}
+
+func NewSwapFile(dir string, chunkSize int64) *SwapFile {
+ return &SwapFile{
+ dir: dir,
+ file: nil,
+ logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
+ chunkSize: chunkSize,
+ }
+}
+func (sf *SwapFile) FreeResource() {
+ if sf.file != nil {
+ sf.file.Close()
+ os.Remove(sf.file.Name())
+ }
+}
+
+func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
+ if sf.file == nil {
+ var err error
+ sf.file, err = os.CreateTemp(sf.dir, "")
+ if err != nil {
+ glog.Errorf("create swap file: %v", err)
+ return nil
+ }
+ }
+ actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
+ if !found {
+ actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
+ sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
+ }
+
+ return &SwapFileChunk{
+ swapfile: sf,
+ usage: newChunkWrittenIntervalList(),
+ logicChunkIndex: logicChunkIndex,
+ actualChunkIndex: actualChunkIndex,
+ }
+}
+
+func (sc *SwapFileChunk) FreeResource() {
+}
+
+func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
+ innerOffset := offset % sc.swapfile.chunkSize
+ var err error
+ n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
+ if err == nil {
+ sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
+ } else {
+ glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
+ }
+ return
+}
+
+func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
+ for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
+ logicStart := max(off, chunkStartOffset+t.StartOffset)
+ logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
+ if logicStart < logicStop {
+ actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
+ if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ return
+}
+
+func (sc *SwapFileChunk) IsComplete() bool {
+ return sc.usage.IsComplete(sc.swapfile.chunkSize)
+}
+
+func (sc *SwapFileChunk) WrittenSize() int64 {
+ return sc.usage.WrittenSize()
+}
+
+func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
+ if saveFn == nil {
+ return
+ }
+ for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
+ data := mem.Allocate(int(t.Size()))
+ sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
+ reader := util.NewBytesReader(data)
+ saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
+ })
+ mem.Free(data)
+ }
+ sc.usage = newChunkWrittenIntervalList()
+}
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
new file mode 100644
index 000000000..53641e66d
--- /dev/null
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -0,0 +1,182 @@
+package page_writer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type LogicChunkIndex int
+
+type UploadPipeline struct {
+ filepath util.FullPath
+ ChunkSize int64
+ writableChunks map[LogicChunkIndex]PageChunk
+ writableChunksLock sync.Mutex
+ sealedChunks map[LogicChunkIndex]*SealedChunk
+ sealedChunksLock sync.Mutex
+ uploaders *util.LimitedConcurrentExecutor
+ uploaderCount int32
+ uploaderCountCond *sync.Cond
+ saveToStorageFn SaveToStorageFunc
+ activeReadChunks map[LogicChunkIndex]int
+ activeReadChunksLock sync.Mutex
+ bufferChunkLimit int
+}
+
+type SealedChunk struct {
+ chunk PageChunk
+ referenceCounter int // track uploading or reading processes
+}
+
+func (sc *SealedChunk) FreeReference(messageOnFree string) {
+ sc.referenceCounter--
+ if sc.referenceCounter == 0 {
+ glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
+ sc.chunk.FreeResource()
+ }
+}
+
+func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
+ return &UploadPipeline{
+ ChunkSize: chunkSize,
+ writableChunks: make(map[LogicChunkIndex]PageChunk),
+ sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
+ uploaders: writers,
+ uploaderCountCond: sync.NewCond(&sync.Mutex{}),
+ saveToStorageFn: saveToStorageFn,
+ activeReadChunks: make(map[LogicChunkIndex]int),
+ bufferChunkLimit: bufferChunkLimit,
+ }
+}
+
+func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
+ up.writableChunksLock.Lock()
+ defer up.writableChunksLock.Unlock()
+
+ logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
+
+ memChunk, found := up.writableChunks[logicChunkIndex]
+ if !found {
+ if len(up.writableChunks) < up.bufferChunkLimit {
+ memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ } else {
+ fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
+ for lci, mc := range up.writableChunks {
+ chunkFullness := mc.WrittenSize()
+ if fullness < chunkFullness {
+ fullestChunkIndex = lci
+ fullness = chunkFullness
+ }
+ }
+ up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
+ delete(up.writableChunks, fullestChunkIndex)
+ fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
+ memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ }
+ up.writableChunks[logicChunkIndex] = memChunk
+ }
+ n = memChunk.WriteDataAt(p, off)
+ up.maybeMoveToSealed(memChunk, logicChunkIndex)
+
+ return
+}
+
+func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
+ logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
+
+ // read from sealed chunks first
+ up.sealedChunksLock.Lock()
+ sealedChunk, found := up.sealedChunks[logicChunkIndex]
+ if found {
+ sealedChunk.referenceCounter++
+ }
+ up.sealedChunksLock.Unlock()
+ if found {
+ maxStop = sealedChunk.chunk.ReadDataAt(p, off)
+ glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
+ sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
+ }
+
+ // read from writable chunks last
+ up.writableChunksLock.Lock()
+ defer up.writableChunksLock.Unlock()
+ writableChunk, found := up.writableChunks[logicChunkIndex]
+ if !found {
+ return
+ }
+ writableMaxStop := writableChunk.ReadDataAt(p, off)
+ glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
+ maxStop = max(maxStop, writableMaxStop)
+
+ return
+}
+
+func (up *UploadPipeline) FlushAll() {
+ up.writableChunksLock.Lock()
+ defer up.writableChunksLock.Unlock()
+
+ for logicChunkIndex, memChunk := range up.writableChunks {
+ up.moveToSealed(memChunk, logicChunkIndex)
+ }
+
+ up.waitForCurrentWritersToComplete()
+}
+
+func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
+ if memChunk.IsComplete() {
+ up.moveToSealed(memChunk, logicChunkIndex)
+ }
+}
+
+func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
+ atomic.AddInt32(&up.uploaderCount, 1)
+ glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
+
+ up.sealedChunksLock.Lock()
+
+ if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
+ oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
+ }
+ sealedChunk := &SealedChunk{
+ chunk: memChunk,
+ referenceCounter: 1, // default 1 is for uploading process
+ }
+ up.sealedChunks[logicChunkIndex] = sealedChunk
+ delete(up.writableChunks, logicChunkIndex)
+
+ up.sealedChunksLock.Unlock()
+
+ up.uploaders.Execute(func() {
+ // first add to the file chunks
+ sealedChunk.chunk.SaveContent(up.saveToStorageFn)
+
+ // notify waiting process
+ atomic.AddInt32(&up.uploaderCount, -1)
+ glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
+ // Lock and Unlock are not required,
+ // but it may signal multiple times during one wakeup,
+ // and the waiting goroutine may miss some of them!
+ up.uploaderCountCond.L.Lock()
+ up.uploaderCountCond.Broadcast()
+ up.uploaderCountCond.L.Unlock()
+
+ // wait for readers
+ for up.IsLocked(logicChunkIndex) {
+ time.Sleep(59 * time.Millisecond)
+ }
+
+ // then remove from sealed chunks
+ up.sealedChunksLock.Lock()
+ defer up.sealedChunksLock.Unlock()
+ delete(up.sealedChunks, logicChunkIndex)
+ sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
+
+ })
+}
+
+func (up *UploadPipeline) Shutdown() {
+}
diff --git a/weed/mount/page_writer/upload_pipeline_lock.go b/weed/mount/page_writer/upload_pipeline_lock.go
new file mode 100644
index 000000000..47a40ba37
--- /dev/null
+++ b/weed/mount/page_writer/upload_pipeline_lock.go
@@ -0,0 +1,63 @@
+package page_writer
+
+import (
+ "sync/atomic"
+)
+
+func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
+ startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
+ stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
+ if stopOffset%up.ChunkSize > 0 {
+ stopLogicChunkIndex += 1
+ }
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
+ if count, found := up.activeReadChunks[i]; found {
+ up.activeReadChunks[i] = count + 1
+ } else {
+ up.activeReadChunks[i] = 1
+ }
+ }
+}
+
+func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
+ startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
+ stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
+ if stopOffset%up.ChunkSize > 0 {
+ stopLogicChunkIndex += 1
+ }
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
+ if count, found := up.activeReadChunks[i]; found {
+ if count == 1 {
+ delete(up.activeReadChunks, i)
+ } else {
+ up.activeReadChunks[i] = count - 1
+ }
+ }
+ }
+}
+
+func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ if count, found := up.activeReadChunks[logicChunkIndex]; found {
+ return count > 0
+ }
+ return false
+}
+
+func (up *UploadPipeline) waitForCurrentWritersToComplete() {
+ up.uploaderCountCond.L.Lock()
+ t := int32(100)
+ for {
+ t = atomic.LoadInt32(&up.uploaderCount)
+ if t <= 0 {
+ break
+ }
+ up.uploaderCountCond.Wait()
+ }
+ up.uploaderCountCond.L.Unlock()
+}
diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go
new file mode 100644
index 000000000..816fb228b
--- /dev/null
+++ b/weed/mount/page_writer/upload_pipeline_test.go
@@ -0,0 +1,47 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "testing"
+)
+
+func TestUploadPipeline(t *testing.T) {
+
+ uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
+
+ writeRange(uploadPipeline, 0, 131072)
+ writeRange(uploadPipeline, 131072, 262144)
+ writeRange(uploadPipeline, 262144, 1025536)
+
+ confirmRange(t, uploadPipeline, 0, 1025536)
+
+ writeRange(uploadPipeline, 1025536, 1296896)
+
+ confirmRange(t, uploadPipeline, 1025536, 1296896)
+
+ writeRange(uploadPipeline, 1296896, 2162688)
+
+ confirmRange(t, uploadPipeline, 1296896, 2162688)
+
+ confirmRange(t, uploadPipeline, 1296896, 2162688)
+}
+
+// startOff and stopOff must be divided by 4
+func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
+ p := make([]byte, 4)
+ for i := startOff / 4; i < stopOff/4; i += 4 {
+ util.Uint32toBytes(p, uint32(i))
+ uploadPipeline.SaveDataAt(p, i)
+ }
+}
+
+func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
+ p := make([]byte, 4)
+ for i := startOff; i < stopOff/4; i += 4 {
+ uploadPipeline.MaybeReadDataAt(p, i)
+ x := util.BytesToUint32(p)
+ if x != uint32(i) {
+ t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
+ }
+ }
+}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 1e9f07df9..b7f50cd13 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -2,14 +2,18 @@ package mount
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/util/grace"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/hanwen/go-fuse/v2/fuse"
"google.golang.org/grpc"
+ "math/rand"
"os"
"path"
"path/filepath"
@@ -54,13 +58,15 @@ type WFS struct {
// follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
fuse.RawFileSystem
fs.Inode
- option *Option
- metaCache *meta_cache.MetaCache
- stats statsCache
- root Directory
- signature int32
- inodeToPath *InodeToPath
- fhmap *FileHandleToInode
+ option *Option
+ metaCache *meta_cache.MetaCache
+ stats statsCache
+ root Directory
+ chunkCache *chunk_cache.TieredChunkCache
+ signature int32
+ concurrentWriters *util.LimitedConcurrentExecutor
+ inodeToPath *InodeToPath
+ fhmap *FileHandleToInode
}
func NewSeaweedFileSystem(option *Option) *WFS {
@@ -79,12 +85,21 @@ func NewSeaweedFileSystem(option *Option) *WFS {
parent: nil,
}
+ wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
+ wfs.option.setupUniqueCacheDirectory()
+ if option.CacheSizeMB > 0 {
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
+ }
+
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
+ if wfs.option.ConcurrentWriters > 0 {
+ wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ }
return wfs
}
@@ -132,6 +147,19 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St
return cachedEntry.ToProtoEntry(), fuse.OK
}
+func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ return func(fileId string) (targetUrls []string, err error) {
+ return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
+ }
+ }
+ return filer.LookupFn(wfs)
+}
+
+func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
+ return wfs.option.FilerAddresses[wfs.option.filerIndex]
+}
+
func (option *Option) setupUniqueCacheDirectory() {
cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
diff --git a/weed/mount/weedfs_file_read.go b/weed/mount/weedfs_file_read.go
index d9ad1f4ea..00143a5b4 100644
--- a/weed/mount/weedfs_file_read.go
+++ b/weed/mount/weedfs_file_read.go
@@ -1,7 +1,9 @@
package mount
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/hanwen/go-fuse/v2/fuse"
+ "io"
)
/**
@@ -29,6 +31,29 @@ import (
* @param off offset to read from
* @param fi file information
*/
-func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buf []byte) (fuse.ReadResult, fuse.Status) {
- return nil, fuse.ENOSYS
+func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) {
+ fh := wfs.GetHandle(FileHandleId(in.Fh))
+ if fh == nil {
+ return nil, fuse.ENOENT
+ }
+
+ offset := int64(in.Offset)
+ fh.lockForRead(offset, len(buff))
+ defer fh.unlockForRead(offset, len(buff))
+
+ totalRead, err := fh.readFromChunks(buff, offset)
+ if err == nil || err == io.EOF {
+ maxStop := fh.readFromDirtyPages(buff, offset)
+ totalRead = max(maxStop-offset, totalRead)
+ }
+ if err == io.EOF {
+ err = nil
+ }
+
+ if err != nil {
+ glog.Warningf("file handle read %s %d: %v", fh.FullPath(), totalRead, err)
+ return nil, fuse.EIO
+ }
+
+ return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go
index 551394262..03f72282e 100644
--- a/weed/mount/weedfs_filehandle.go
+++ b/weed/mount/weedfs_filehandle.go
@@ -5,7 +5,7 @@ import "github.com/hanwen/go-fuse/v2/fuse"
func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHandle, code fuse.Status) {
_, entry, status := wfs.maybeReadEntry(inode)
if status == fuse.OK {
- fileHandle = wfs.fhmap.GetFileHandle(inode)
+ fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry)
fileHandle.entry = entry
}
return
@@ -14,3 +14,7 @@ func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHa
func (wfs *WFS) ReleaseHandle(handleId FileHandleId) {
wfs.fhmap.ReleaseByHandle(handleId)
}
+
+func (wfs *WFS) GetHandle(handleId FileHandleId) *FileHandle {
+ return wfs.fhmap.GetFileHandle(handleId)
+}
diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go
new file mode 100644
index 000000000..723ce9c34
--- /dev/null
+++ b/weed/mount/weedfs_write.go
@@ -0,0 +1,84 @@
+package mount
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DiskType: string(wfs.option.DiskType),
+ DataCenter: wfs.option.DataCenter,
+ Path: string(fullPath),
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := resp.Location
+ host = wfs.AdjustedUrl(loc)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ })
+ }); err != nil {
+ return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
+ }
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fileUrl,
+ Filename: filename,
+ Cipher: wfs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, err, data := operation.Upload(reader, uploadOption)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ if offset == 0 {
+ wfs.chunkCache.SetChunk(fileId, data)
+ }
+
+ chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ return chunk, collection, replication, nil
+ }
+}