aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-11-09 00:10:59 -0800
committerChris Lu <chris.lu@gmail.com>2019-11-09 00:10:59 -0800
commit85f8649320033a46e71b20a588930013c00b3fdf (patch)
tree3cbcd4988d1327c465896df294141ee52f2b4701
parentc5c1d83d91e5de51dd51b96cf338b00f38532d6c (diff)
downloadseaweedfs-85f8649320033a46e71b20a588930013c00b3fdf.tar.xz
seaweedfs-85f8649320033a46e71b20a588930013c00b3fdf.zip
refactor memory mapped file into backend storage
-rw-r--r--weed/storage/backend/memory_map/memory_map.go2
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go60
-rw-r--r--weed/storage/needle/needle_read_write.go60
-rw-r--r--weed/storage/volume_create.go5
-rw-r--r--weed/storage/volume_create_linux.go5
-rw-r--r--weed/storage/volume_create_windows.go32
-rw-r--r--weed/storage/volume_loading.go6
-rw-r--r--weed/storage/volume_read_write.go7
-rw-r--r--weed/storage/volume_super_block.go60
-rw-r--r--weed/storage/volume_vacuum.go5
10 files changed, 122 insertions, 120 deletions
diff --git a/weed/storage/backend/memory_map/memory_map.go b/weed/storage/backend/memory_map/memory_map.go
index e940fcc0e..5dc7ba33d 100644
--- a/weed/storage/backend/memory_map/memory_map.go
+++ b/weed/storage/backend/memory_map/memory_map.go
@@ -21,8 +21,6 @@ type MemoryMap struct {
End_of_file int64
}
-var FileMemoryMap = make(map[string]*MemoryMap)
-
func ReadMemoryMapMaxSizeMb(memoryMapMaxSizeMbString string) (uint32, error) {
if memoryMapMaxSizeMbString == "" {
return 0, nil
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
new file mode 100644
index 000000000..d999b917e
--- /dev/null
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -0,0 +1,60 @@
+package memory_map
+
+import (
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+)
+
+var (
+ _ backend.DataStorageBackend = &MemoryMappedFile{}
+)
+
+type MemoryMappedFile struct {
+ mm *MemoryMap
+}
+
+func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile {
+ mmf := &MemoryMappedFile{
+ mm : new(MemoryMap),
+ }
+ mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB))
+ return mmf
+}
+
+func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) {
+ readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p)))
+ if e != nil {
+ return 0, e
+ }
+ // TODO avoid the extra copy
+ copy(p, readBytes)
+ return len(readBytes), nil
+}
+
+func (mmf *MemoryMappedFile) WriteAt(p []byte, off int64) (n int, err error) {
+ mmf.mm.WriteMemory(uint64(off), uint64(len(p)), p)
+ return len(p), nil
+}
+
+func (mmf *MemoryMappedFile) Truncate(off int64) error {
+ return nil
+}
+
+func (mmf *MemoryMappedFile) Close() error {
+ mmf.mm.DeleteFileAndMemoryMap()
+ return nil
+}
+
+func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err error) {
+ stat, e := mmf.mm.File.Stat()
+ if e == nil {
+ return stat.Size(), stat.ModTime(), nil
+ }
+ return 0, time.Time{}, err
+}
+
+func (mmf *MemoryMappedFile) String() string {
+ return mmf.mm.File.Name()
+}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 353a3c78b..8e5d18b1a 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -8,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -128,33 +127,24 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) {
- mMap, exists := memory_map.FileMemoryMap[w.String()]
- if !exists {
- if end, _, e := w.GetStat(); e == nil {
- defer func(w backend.DataStorageBackend, off int64) {
- if err != nil {
- if te := w.Truncate(end); te != nil {
- glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
- }
+ if end, _, e := w.GetStat(); e == nil {
+ defer func(w backend.DataStorageBackend, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
}
- }(w, end)
- offset = uint64(end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
- return
- }
+ }
+ }(w, end)
+ offset = uint64(end)
} else {
- offset = uint64(mMap.End_of_file + 1)
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
}
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
if err == nil {
- if exists {
- mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite)
- } else {
- _, err = w.WriteAt(bytesToWrite, int64(offset))
- }
+ _, err = w.WriteAt(bytesToWrite, int64(offset))
}
return offset, size, actualSize, err
@@ -165,14 +155,9 @@ func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, ver
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
- mMap, exists := memory_map.FileMemoryMap[r.String()]
- if exists {
- dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize))
- return dataSlice, err
- } else {
- _, err = r.ReadAt(dataSlice, offset)
- return dataSlice, err
- }
+ _, err = r.ReadAt(dataSlice, offset)
+ return dataSlice, err
+
}
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
@@ -286,19 +271,12 @@ func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int6
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleHeaderSize)
- mMap, exists := memory_map.FileMemoryMap[r.String()]
- if exists {
- bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize)
- if err != nil {
- return nil, bytes, 0, err
- }
- } else {
- var count int
- count, err = r.ReadAt(bytes, offset)
- if count <= 0 || err != nil {
- return nil, bytes, 0, err
- }
+ var count int
+ count, err = r.ReadAt(bytes, offset)
+ if count <= 0 || err != nil {
+ return nil, bytes, 0, err
}
+
n.ParseNeedleHeader(bytes)
bodyLength = NeedleBodyLength(n.Size, version)
}
diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go
index ef58e5871..b27a62990 100644
--- a/weed/storage/volume_create.go
+++ b/weed/storage/volume_create.go
@@ -6,12 +6,13 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
- return file, e
+ return backend.NewDiskFile(file), e
}
diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go
index d9dfc3862..e3305d991 100644
--- a/weed/storage/volume_create_linux.go
+++ b/weed/storage/volume_create_linux.go
@@ -7,13 +7,14 @@ import (
"syscall"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
- return file, e
+ return backend.NewDiskFile(file), e
}
diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go
index 12826f613..81536810b 100644
--- a/weed/storage/volume_create_windows.go
+++ b/weed/storage/volume_create_windows.go
@@ -3,36 +3,26 @@
package storage
import (
- "os"
-
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"golang.org/x/sys/windows"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
-
- mMap, exists := memory_map.FileMemoryMap[fileName]
- if !exists {
-
- if preallocate > 0 {
- glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
- }
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
- if memoryMapSizeMB > 0 {
- file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
- memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap)
+ if preallocate > 0 {
+ glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
+ }
- new_mMap := memory_map.FileMemoryMap[fileName]
- new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB))
- return file, e
- } else {
- file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
- return file, e
- }
+ if memoryMapSizeMB > 0 {
+ file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
+ return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e
} else {
- return mMap.File, nil
+ file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
+ return backend.NewDiskFile(file), e
}
+
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index e769321c4..6f1d8fe40 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -25,13 +25,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
var e error
fileName := v.FileName()
alreadyHasSuperBlock := false
- var dataFile *os.File
// open dat file
if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
+ var dataFile *os.File
if canWrite {
dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
} else {
@@ -43,14 +43,14 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if fileSize >= _SuperBlockSize {
alreadyHasSuperBlock = true
}
+ v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
- dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
+ v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
} else {
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
}
}
- v.DataBackend = backend.NewDiskFile(dataFile)
if e != nil {
if !os.IsPermission(e) {
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index f2b17a827..d9b79795b 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -46,12 +45,6 @@ func (v *Volume) Destroy() (err error) {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}
- mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()]
- if exists {
- mMap.DeleteFileAndMemoryMap()
- delete(memory_map.FileMemoryMap, v.DataBackend.String())
- }
-
v.Close()
os.Remove(v.FileName() + ".dat")
os.Remove(v.FileName() + ".idx")
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index 6faf34ef3..bce5af465 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -4,11 +4,9 @@ import (
"fmt"
"os"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -78,35 +76,26 @@ func (s *SuperBlock) Initialized() bool {
func (v *Volume) maybeWriteSuperBlock() error {
- mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()]
- if exists {
- if mMap.End_of_file == -1 {
- v.SuperBlock.version = needle.CurrentVersion
- mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
- }
- return nil
- } else {
- datSize, _, e := v.DataBackend.GetStat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
- return e
- }
- if datSize == 0 {
- v.SuperBlock.version = needle.CurrentVersion
- _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- var dataFile *os.File
- if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
- v.DataBackend = backend.NewDiskFile(dataFile)
- if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
- v.readOnly = false
- }
+ datSize, _, e := v.DataBackend.GetStat()
+ if e != nil {
+ glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
+ return e
+ }
+ if datSize == 0 {
+ v.SuperBlock.version = needle.CurrentVersion
+ _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ var dataFile *os.File
+ if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
+ v.DataBackend = backend.NewDiskFile(dataFile)
+ if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
+ v.readOnly = false
}
}
}
- return e
}
+ return e
}
func (v *Volume) readSuperBlock() (err error) {
@@ -118,18 +107,9 @@ func (v *Volume) readSuperBlock() (err error) {
func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) {
header := make([]byte, _SuperBlockSize)
- mMap, exists := memory_map.FileMemoryMap[datBackend.String()]
- if exists {
- header, err = mMap.ReadMemory(0, _SuperBlockSize)
- if err != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), err)
- return
- }
- } else {
- if _, e := datBackend.ReadAt(header, 0); e != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
- return
- }
+ if _, e := datBackend.ReadAt(header, 0); e != nil {
+ err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
+ return
}
superBlock.version = needle.Version(header[0])
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index e5d932c9e..e90746b54 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -312,7 +312,8 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
- dst, idx *os.File
+ dst backend.DataStorageBackend
+ idx *os.File
)
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
@@ -328,7 +329,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
v: v,
now: uint64(time.Now().Unix()),
nm: NewBtreeNeedleMap(idx),
- dstBackend: backend.NewDiskFile(dst),
+ dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)