aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/memory_map/memory_map_windows.go (renamed from weed/storage/memory_map_windows.go)42
-rw-r--r--weed/storage/needle/needle_read_write.go165
-rw-r--r--weed/storage/volume_read_write.go6
-rw-r--r--weed/storage/volume_super_block.go67
4 files changed, 170 insertions, 110 deletions
diff --git a/weed/storage/memory_map_windows.go b/weed/storage/memory_map/memory_map_windows.go
index a6c96caaf..e293be515 100644
--- a/weed/storage/memory_map_windows.go
+++ b/weed/storage/memory_map/memory_map_windows.go
@@ -1,6 +1,6 @@
// +build windows
-package storage
+package memory_map
import (
"reflect"
@@ -13,19 +13,20 @@ import (
type DWORD = uint32
type WORD = uint16
-type memory_buffer struct {
+type MemoryBuffer struct {
aligned_length uint64
length uint64
aligned_ptr uintptr
ptr uintptr
- buffer []byte
+ Buffer []byte
}
-type memory_map struct {
+type MemoryMap struct {
file_handle windows.Handle
file_memory_map_handle windows.Handle
- write_map_views []memory_buffer
+ write_map_views []MemoryBuffer
max_length uint64
+ End_Of_File int64
// read_map_views []memory_buffer
}
@@ -33,13 +34,15 @@ var (
procGetSystemInfo = syscall.NewLazyDLL("kernel32.dll").NewProc("GetSystemInfo")
)
+var FileMemoryMap = make(map[string]MemoryMap)
+
var system_info, err = getSystemInfo()
var chunk_size = uint64(system_info.dwAllocationGranularity) * 512
-func CreateMemoryMap(hFile windows.Handle, maxlength uint64) memory_map {
+func CreateMemoryMap(hFile windows.Handle, maxlength uint64) MemoryMap {
- mem_map := memory_map{}
+ mem_map := MemoryMap{}
maxlength_high := uint32(maxlength >> 32)
maxlength_low := uint32(maxlength & 0xFFFFFFFF)
file_memory_map_handle, err := windows.CreateFileMapping(hFile, nil, windows.PAGE_READWRITE, maxlength_high, maxlength_low, nil)
@@ -48,12 +51,13 @@ func CreateMemoryMap(hFile windows.Handle, maxlength uint64) memory_map {
mem_map.file_handle = hFile
mem_map.file_memory_map_handle = file_memory_map_handle
mem_map.max_length = maxlength
+ mem_map.End_Of_File = -1
}
return mem_map
}
-func DeleteFileAndMemoryMap(mem_map memory_map) {
+func DeleteFileAndMemoryMap(mem_map MemoryMap) {
windows.CloseHandle(mem_map.file_memory_map_handle)
windows.CloseHandle(mem_map.file_handle)
@@ -72,7 +76,7 @@ func min(x, y uint64) uint64 {
return y
}
-func WriteMemory(mem_map memory_map, offset uint64, length uint64, data []byte) {
+func WriteMemory(mem_map MemoryMap, offset uint64, length uint64, data []byte) {
for {
if ((offset+length)/chunk_size)+1 > uint64(len(mem_map.write_map_views)) {
@@ -89,7 +93,7 @@ func WriteMemory(mem_map memory_map, offset uint64, length uint64, data []byte)
for {
write_end := min(remaining_length, chunk_size)
- copy(mem_map.write_map_views[slice_index].buffer[slice_offset:write_end], data[data_offset:])
+ copy(mem_map.write_map_views[slice_index].Buffer[slice_offset:write_end], data[data_offset:])
remaining_length -= (write_end - slice_offset)
data_offset += (write_end - slice_offset)
@@ -100,23 +104,27 @@ func WriteMemory(mem_map memory_map, offset uint64, length uint64, data []byte)
break
}
}
+
+ if mem_map.End_Of_File < int64(offset+length) {
+ mem_map.End_Of_File = int64(offset + length)
+ }
}
-func ReadMemory(mem_map memory_map, offset uint64, length uint64) (memory_buffer, error) {
+func ReadMemory(mem_map MemoryMap, offset uint64, length uint64) (MemoryBuffer, error) {
return allocate(mem_map.file_memory_map_handle, offset, length, false)
}
-func ReleaseMemory(mem_buffer memory_buffer) {
+func ReleaseMemory(mem_buffer MemoryBuffer) {
windows.UnmapViewOfFile(mem_buffer.aligned_ptr)
mem_buffer.ptr = 0
mem_buffer.aligned_ptr = 0
mem_buffer.length = 0
mem_buffer.aligned_length = 0
- mem_buffer.buffer = nil
+ mem_buffer.Buffer = nil
}
-func allocateChunk(mem_map memory_map) {
+func allocateChunk(mem_map MemoryMap) {
start := uint64(len(mem_map.write_map_views)-1) * chunk_size
mem_buffer, err := allocate(mem_map.file_memory_map_handle, start, chunk_size, true)
@@ -126,9 +134,9 @@ func allocateChunk(mem_map memory_map) {
}
}
-func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) (memory_buffer, error) {
+func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) (MemoryBuffer, error) {
- mem_buffer := memory_buffer{}
+ mem_buffer := MemoryBuffer{}
dwSysGran := system_info.dwAllocationGranularity
@@ -160,7 +168,7 @@ func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool)
mem_buffer.ptr = addr_ptr + uintptr(diff)
mem_buffer.length = length
- slice_header := (*reflect.SliceHeader)(unsafe.Pointer(&mem_buffer.buffer))
+ slice_header := (*reflect.SliceHeader)(unsafe.Pointer(&mem_buffer.Buffer))
slice_header.Data = addr_ptr + uintptr(diff)
slice_header.Len = int(length)
slice_header.Cap = int(length)
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 75aefdf16..828447bbe 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/joeslay/seaweedfs/weed/storage/memory_map"
)
const (
@@ -29,39 +30,25 @@ func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
-func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
- if end, e := w.Seek(0, io.SeekEnd); e == nil {
- defer func(w *os.File, off int64) {
- if err != nil {
- if te := w.Truncate(end); te != nil {
- glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
- }
- }
- }(w, end)
- offset = uint64(end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
- return
- }
+func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) {
+
+ writeBytes := make([]byte, 0)
+
switch version {
case Version1:
header := make([]byte, NeedleHeaderSize)
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
n.Size = uint32(len(n.Data))
- size = n.Size
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
- if _, err = w.Write(header); err != nil {
- return
- }
- if _, err = w.Write(n.Data); err != nil {
- return
- }
- actualSize = NeedleHeaderSize + int64(n.Size)
+ size := n.Size
+ actualSize := NeedleHeaderSize + int64(n.Size)
+ writeBytes = append(writeBytes, header...)
+ writeBytes = append(writeBytes, n.Data...)
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- _, err = w.Write(header[0 : NeedleChecksumSize+padding])
- return
+ writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
+ return writeBytes, size, actualSize, nil
case Version2, Version3:
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
CookieToBytes(header[0:CookieSize], n.Cookie)
@@ -92,82 +79,103 @@ func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32
} else {
n.Size = 0
}
- size = n.DataSize
util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
- if _, err = w.Write(header[0:NeedleHeaderSize]); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
if n.DataSize > 0 {
util.Uint32toBytes(header[0:4], n.DataSize)
- if _, err = w.Write(header[0:4]); err != nil {
- return
- }
- if _, err = w.Write(n.Data); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:4]...)
+ writeBytes = append(writeBytes, n.Data...)
util.Uint8toBytes(header[0:1], n.Flags)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:1]...)
if n.HasName() {
util.Uint8toBytes(header[0:1], n.NameSize)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if _, err = w.Write(n.Name[:n.NameSize]); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:1]...)
+ writeBytes = append(writeBytes, n.Name[:n.NameSize]...)
}
if n.HasMime() {
util.Uint8toBytes(header[0:1], n.MimeSize)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if _, err = w.Write(n.Mime); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:1]...)
+ writeBytes = append(writeBytes, n.Mime...)
}
if n.HasLastModifiedDate() {
util.Uint64toBytes(header[0:8], n.LastModified)
- if _, err = w.Write(header[8-LastModifiedBytesLength : 8]); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...)
}
if n.HasTtl() && n.Ttl != nil {
n.Ttl.ToBytes(header[0:TtlBytesLength])
- if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:TtlBytesLength]...)
}
if n.HasPairs() {
util.Uint16toBytes(header[0:2], n.PairsSize)
- if _, err = w.Write(header[0:2]); err != nil {
- return
- }
- if _, err = w.Write(n.Pairs); err != nil {
- return
- }
+ writeBytes = append(writeBytes, header[0:2]...)
+ writeBytes = append(writeBytes, n.Pairs...)
}
}
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
if version == Version2 {
- _, err = w.Write(header[0 : NeedleChecksumSize+padding])
+ writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
} else {
// version3
util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs)
- _, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
+ writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
}
- return offset, n.DataSize, GetActualSize(n.Size, version), err
+ return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil
}
- return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
+
+ return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
+}
+
+func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
+
+ mem_map, exists := memory_map.FileMemoryMap[w.Name()]
+ if !exists {
+ if end, e := w.Seek(0, io.SeekEnd); e == nil {
+ defer func(w *os.File, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
+ }
+ }
+ }(w, end)
+ offset = uint64(end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
+ }
+ } else {
+ offset = uint64(mem_map.End_Of_File + 1)
+ }
+
+ bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
+
+ if err == nil {
+ if exists {
+ memory_map.WriteMemory(mem_map, offset, uint64(len(bytesToWrite)), bytesToWrite)
+ } else {
+ _, err = w.Write(bytesToWrite)
+ }
+ }
+
+ return offset, size, actualSize, err
}
func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
- dataSlice = make([]byte, int(GetActualSize(size, version)))
- _, err = r.ReadAt(dataSlice, offset)
- return dataSlice, err
+
+ dataSize := GetActualSize(size, version)
+ dataSlice = make([]byte, dataSize)
+
+ mem_map, exists := memory_map.FileMemoryMap[r.Name()]
+ if exists {
+ mem_buffer, err := memory_map.ReadMemory(mem_map, uint64(offset), uint64(dataSize))
+ copy(dataSlice, mem_buffer.Buffer)
+ memory_map.ReleaseMemory(mem_buffer)
+ return dataSlice, err
+ } else {
+ _, err = r.ReadAt(dataSlice, offset)
+ return dataSlice, err
+ }
}
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
@@ -280,14 +288,27 @@ func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, byt
n = new(Needle)
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleHeaderSize)
- var count int
- count, err = r.ReadAt(bytes, offset)
- if count <= 0 || err != nil {
- return nil, bytes, 0, err
+
+ mem_map, exists := memory_map.FileMemoryMap[r.Name()]
+ if exists {
+ mem_buffer, err := memory_map.ReadMemory(mem_map, uint64(offset), NeedleHeaderSize)
+ copy(bytes, mem_buffer.Buffer)
+ memory_map.ReleaseMemory(mem_buffer)
+
+ if err != nil {
+ return nil, bytes, 0, err
+ }
+ } else {
+ var count int
+ count, err = r.ReadAt(bytes, offset)
+ if count <= 0 || err != nil {
+ return nil, bytes, 0, err
+ }
}
n.ParseNeedleHeader(bytes)
bodyLength = NeedleBodyLength(n.Size, version)
}
+
return
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index ae05331a4..c7ba28e53 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/joeslay/seaweedfs/weed/storage/memory_map"
)
var ErrorNotFound = errors.New("not found")
@@ -48,6 +49,11 @@ func (v *Volume) Destroy() (err error) {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}
+ mem_map, exists := memory_map.FileMemoryMap[v.FileName()]
+ if exists {
+ memory_map.DeleteFileAndMemoryMap(mem_map)
+ }
+
v.Close()
os.Remove(v.FileName() + ".dat")
os.Remove(v.FileName() + ".idx")
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index 164c887e1..9ef615cb3 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -4,6 +4,8 @@ import (
"fmt"
"os"
+ "github.com/joeslay/seaweedfs/weed/storage/memory_map"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -70,24 +72,34 @@ func (s *SuperBlock) Bytes() []byte {
}
func (v *Volume) maybeWriteSuperBlock() error {
- stat, e := v.dataFile.Stat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e)
- return e
- }
- if stat.Size() == 0 {
- v.SuperBlock.version = needle.CurrentVersion
- _, e = v.dataFile.Write(v.SuperBlock.Bytes())
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
- if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
- v.readOnly = false
+
+ mem_map, exists := memory_map.FileMemoryMap[v.FileName()]
+ if exists {
+ if mem_map.End_Of_File == -1 {
+ v.SuperBlock.version = needle.CurrentVersion
+ memory_map.WriteMemory(mem_map, 0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
+ }
+ return nil
+ } else {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e)
+ return e
+ }
+ if stat.Size() == 0 {
+ v.SuperBlock.version = needle.CurrentVersion
+ _, e = v.dataFile.Write(v.SuperBlock.Bytes())
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
+ if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
+ v.readOnly = false
+ }
}
}
}
+ return e
}
- return e
}
func (v *Volume) readSuperBlock() (err error) {
@@ -97,15 +109,28 @@ func (v *Volume) readSuperBlock() (err error) {
// ReadSuperBlock reads from data file and load it into volume's super block
func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
- if _, err = dataFile.Seek(0, 0); err != nil {
- err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err)
- return
- }
+
header := make([]byte, _SuperBlockSize)
- if _, e := dataFile.Read(header); e != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
- return
+ mem_map, exists := memory_map.FileMemoryMap[dataFile.Name()]
+ if exists {
+ mem_buffer, e := memory_map.ReadMemory(mem_map, 0, _SuperBlockSize)
+ if err != nil {
+ err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
+ return
+ }
+ copy(header, mem_buffer.Buffer)
+ memory_map.ReleaseMemory(mem_buffer)
+ } else {
+ if _, err = dataFile.Seek(0, 0); err != nil {
+ err = fmt.Errorf("cannot seek to the beginning of %s: %v", dataFile.Name(), err)
+ return
+ }
+ if _, e := dataFile.Read(header); e != nil {
+ err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
+ return
+ }
}
+
superBlock.version = needle.Version(header[0])
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())