aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/backup.go2
-rw-r--r--weed/server/volume_grpc_copy_incremental.go8
-rw-r--r--weed/server/volume_grpc_tail.go2
-rw-r--r--weed/storage/backend/backend.go15
-rw-r--r--weed/storage/backend/disk_file.go50
-rw-r--r--weed/storage/backend/memory_map/memory_map.go2
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go60
-rw-r--r--weed/storage/needle/needle_read_write.go73
-rw-r--r--weed/storage/needle/needle_read_write_test.go6
-rw-r--r--weed/storage/store.go4
-rw-r--r--weed/storage/volume.go27
-rw-r--r--weed/storage/volume_backup.go17
-rw-r--r--weed/storage/volume_checking.go5
-rw-r--r--weed/storage/volume_create.go5
-rw-r--r--weed/storage/volume_create_linux.go5
-rw-r--r--weed/storage/volume_create_windows.go32
-rw-r--r--weed/storage/volume_loading.go10
-rw-r--r--weed/storage/volume_read_write.go34
-rw-r--r--weed/storage/volume_super_block.go63
-rw-r--r--weed/storage/volume_vacuum.go43
20 files changed, 272 insertions, 191 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 505de4ae6..cef2bbe3a 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -128,7 +128,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
- v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0)
+ v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
}
datSize, _, _ := v.FileStat()
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index f56fbeef4..6c5bb8a62 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -4,9 +4,9 @@ import (
"context"
"fmt"
"io"
- "os"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
@@ -30,7 +30,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
startOffset := foundOffset.ToAcutalOffset()
buf := make([]byte, 1024*1024*2)
- return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream)
+ return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
}
@@ -47,10 +47,10 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
}
-func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+func sendFileContent(datBackend backend.DataStorageBackend, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
var blockSizeLimit = int64(len(buf))
for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
- n, readErr := datFile.ReadAt(buf, startOffset+i)
+ n, readErr := datBackend.ReadAt(buf, startOffset+i)
if readErr == nil || readErr == io.EOF {
resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
resp.FileContent = buf[:int64(n)]
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index cb0d320ad..1bf61e1c7 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -71,7 +71,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
stream: stream,
}
- err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner)
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
return scanner.lastProcessedTimestampNs, err
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
new file mode 100644
index 000000000..ae0f84216
--- /dev/null
+++ b/weed/storage/backend/backend.go
@@ -0,0 +1,15 @@
+package backend
+
+import (
+ "io"
+ "time"
+)
+
+type DataStorageBackend interface {
+ io.ReaderAt
+ io.WriterAt
+ Truncate(off int64) error
+ io.Closer
+ GetStat() (datSize int64, modTime time.Time, err error)
+ String() string
+}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
new file mode 100644
index 000000000..7f2b39d15
--- /dev/null
+++ b/weed/storage/backend/disk_file.go
@@ -0,0 +1,50 @@
+package backend
+
+import (
+ "os"
+ "time"
+)
+
+var (
+ _ DataStorageBackend = &DiskFile{}
+)
+
+type DiskFile struct {
+ File *os.File
+ fullFilePath string
+}
+
+func NewDiskFile(f *os.File) *DiskFile {
+ return &DiskFile{
+ fullFilePath: f.Name(),
+ File: f,
+ }
+}
+
+func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
+ return df.File.ReadAt(p, off)
+}
+
+func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
+ return df.File.WriteAt(p, off)
+}
+
+func (df *DiskFile) Truncate(off int64) error {
+ return df.File.Truncate(off)
+}
+
+func (df *DiskFile) Close() error {
+ return df.File.Close()
+}
+
+func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
+ stat, e := df.File.Stat()
+ if e == nil {
+ return stat.Size(), stat.ModTime(), nil
+ }
+ return 0, time.Time{}, err
+}
+
+func (df *DiskFile) String() string {
+ return df.fullFilePath
+}
diff --git a/weed/storage/backend/memory_map/memory_map.go b/weed/storage/backend/memory_map/memory_map.go
index e940fcc0e..5dc7ba33d 100644
--- a/weed/storage/backend/memory_map/memory_map.go
+++ b/weed/storage/backend/memory_map/memory_map.go
@@ -21,8 +21,6 @@ type MemoryMap struct {
End_of_file int64
}
-var FileMemoryMap = make(map[string]*MemoryMap)
-
func ReadMemoryMapMaxSizeMb(memoryMapMaxSizeMbString string) (uint32, error) {
if memoryMapMaxSizeMbString == "" {
return 0, nil
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
new file mode 100644
index 000000000..d999b917e
--- /dev/null
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -0,0 +1,60 @@
+package memory_map
+
+import (
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+)
+
+var (
+ _ backend.DataStorageBackend = &MemoryMappedFile{}
+)
+
+type MemoryMappedFile struct {
+ mm *MemoryMap
+}
+
+func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile {
+ mmf := &MemoryMappedFile{
+ mm : new(MemoryMap),
+ }
+ mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB))
+ return mmf
+}
+
+func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) {
+ readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p)))
+ if e != nil {
+ return 0, e
+ }
+ // TODO avoid the extra copy
+ copy(p, readBytes)
+ return len(readBytes), nil
+}
+
+func (mmf *MemoryMappedFile) WriteAt(p []byte, off int64) (n int, err error) {
+ mmf.mm.WriteMemory(uint64(off), uint64(len(p)), p)
+ return len(p), nil
+}
+
+func (mmf *MemoryMappedFile) Truncate(off int64) error {
+ return nil
+}
+
+func (mmf *MemoryMappedFile) Close() error {
+ mmf.mm.DeleteFileAndMemoryMap()
+ return nil
+}
+
+func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err error) {
+ stat, e := mmf.mm.File.Stat()
+ if e == nil {
+ return stat.Size(), stat.ModTime(), nil
+ }
+ return 0, time.Time{}, err
+}
+
+func (mmf *MemoryMappedFile) String() string {
+ return mmf.mm.File.Name()
+}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 04308a8a7..8e5d18b1a 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -4,12 +4,10 @@ import (
"errors"
"fmt"
"io"
- "os"
-
"math"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -127,53 +125,39 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) {
+func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) {
- mMap, exists := memory_map.FileMemoryMap[w.Name()]
- if !exists {
- if end, e := w.Seek(0, io.SeekEnd); e == nil {
- defer func(w *os.File, off int64) {
- if err != nil {
- if te := w.Truncate(end); te != nil {
- glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
- }
+ if end, _, e := w.GetStat(); e == nil {
+ defer func(w backend.DataStorageBackend, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
}
- }(w, end)
- offset = uint64(end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
- return
- }
+ }
+ }(w, end)
+ offset = uint64(end)
} else {
- offset = uint64(mMap.End_of_file + 1)
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
}
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
if err == nil {
- if exists {
- mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite)
- } else {
- _, err = w.WriteAt(bytesToWrite, int64(offset))
- }
+ _, err = w.WriteAt(bytesToWrite, int64(offset))
}
return offset, size, actualSize, err
}
-func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
+func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
- mMap, exists := memory_map.FileMemoryMap[r.Name()]
- if exists {
- dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize))
- return dataSlice, err
- } else {
- _, err = r.ReadAt(dataSlice, offset)
- return dataSlice, err
- }
+ _, err = r.ReadAt(dataSlice, offset)
+ return dataSlice, err
+
}
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
@@ -207,7 +191,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers
}
// ReadData hydrates the needle from the file, with only n.Id is set.
-func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadData(r backend.DataStorageBackend, offset int64, size uint32, version Version) (err error) {
bytes, err := ReadNeedleBlob(r, offset, size, version)
if err != nil {
return err
@@ -282,24 +266,17 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
return nil
}
-func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
+func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
n = new(Needle)
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleHeaderSize)
- mMap, exists := memory_map.FileMemoryMap[r.Name()]
- if exists {
- bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize)
- if err != nil {
- return nil, bytes, 0, err
- }
- } else {
- var count int
- count, err = r.ReadAt(bytes, offset)
- if count <= 0 || err != nil {
- return nil, bytes, 0, err
- }
+ var count int
+ count, err = r.ReadAt(bytes, offset)
+ if count <= 0 || err != nil {
+ return nil, bytes, 0, err
}
+
n.ParseNeedleHeader(bytes)
bodyLength = NeedleBodyLength(n.Size, version)
}
@@ -324,7 +301,7 @@ func NeedleBodyLength(needleSize uint32, version Version) int64 {
//n should be a needle already read the header
//the input stream will read until next file entry
-func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
+func (n *Needle) ReadNeedleBody(r backend.DataStorageBackend, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
if bodyLength <= 0 {
return nil, nil
diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go
index 4c507f9e6..47582dd26 100644
--- a/weed/storage/needle/needle_read_write_test.go
+++ b/weed/storage/needle/needle_read_write_test.go
@@ -5,6 +5,7 @@ import (
"os"
"testing"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -54,7 +55,10 @@ func TestAppend(t *testing.T) {
os.Remove(tempFile.Name())
}()
- offset, _, _, _ := n.Append(tempFile, CurrentVersion)
+ datBackend := backend.NewDiskFile(tempFile)
+ defer datBackend.Close()
+
+ offset, _, _, _ := n.Append(datBackend, CurrentVersion)
if offset != uint64(fileSize) {
t.Errorf("Fail to Append Needle.")
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 1d909c23a..4d1061bed 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -101,14 +101,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, MemoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.FindFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, MemoryMapMaxSizeMb); err == nil {
+ if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
location.SetVolume(vid, volume)
glog.V(0).Infof("add volume %d", vid)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 6084b4df0..e85696eab 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -5,10 +5,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "os"
"path"
"strconv"
"sync"
@@ -21,7 +21,7 @@ type Volume struct {
Id needle.VolumeId
dir string
Collection string
- dataFile *os.File
+ DataBackend backend.DataStorageBackend
nm NeedleMapper
needleMapKind NeedleMapType
readOnly bool
@@ -39,16 +39,16 @@ type Volume struct {
isCompacting bool
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, MemoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
- v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: MemoryMapMaxSizeMb}
+ v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
return
}
func (v *Volume) String() string {
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly)
+ return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.readOnly)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@@ -63,9 +63,6 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) {
func (v *Volume) FileName() (fileName string) {
return VolumeFileName(v.dir, v.Collection, int(v.Id))
}
-func (v *Volume) DataFile() *os.File {
- return v.dataFile
-}
func (v *Volume) Version() needle.Version {
return v.SuperBlock.Version()
@@ -75,15 +72,15 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
- if v.dataFile == nil {
+ if v.DataBackend == nil {
return
}
- stat, e := v.dataFile.Stat()
+ datFileSize, modTime, e := v.DataBackend.GetStat()
if e == nil {
- return uint64(stat.Size()), v.nm.IndexFileSize(), stat.ModTime()
+ return uint64(datFileSize), v.nm.IndexFileSize(), modTime
}
- glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
+ glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.String(), e)
return // -1 causes integer overflow and the volume to become unwritable.
}
@@ -149,9 +146,9 @@ func (v *Volume) Close() {
v.nm.Close()
v.nm = nil
}
- if v.dataFile != nil {
- _ = v.dataFile.Close()
- v.dataFile = nil
+ if v.DataBackend != nil {
+ _ = v.DataBackend.Close()
+ v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
}
}
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index f48ccbb68..fe0506917 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -19,8 +19,8 @@ func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusRespons
defer v.dataFileAccessLock.Unlock()
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
- if stat, err := v.dataFile.Stat(); err == nil {
- syncStatus.TailOffset = uint64(stat.Size())
+ if datSize, _, err := v.DataBackend.GetStat(); err == nil {
+ syncStatus.TailOffset = uint64(datSize)
}
syncStatus.Collection = v.Collection
syncStatus.IdxFileSize = v.nm.IndexFileSize()
@@ -70,6 +70,8 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
return err
}
+ writeOffset := int64(startFromOffset)
+
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
@@ -80,8 +82,6 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
return err
}
- v.dataFile.Seek(int64(startFromOffset), io.SeekStart)
-
for {
resp, recvErr := stream.Recv()
if recvErr != nil {
@@ -92,10 +92,11 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
}
- _, writeErr := v.dataFile.Write(resp.FileContent)
+ n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset)
if writeErr != nil {
return writeErr
}
+ writeOffset += int64(n)
}
return nil
@@ -107,7 +108,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
// add to needle map
- return ScanVolumeFileFrom(v.version, v.dataFile, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
+ return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
}
@@ -153,11 +154,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
- n, _, bodyLength, err := needle.ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset())
+ n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset())
if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
}
- _, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 8f930546f..61b59e9f7 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -4,6 +4,7 @@ import (
"fmt"
"os"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -29,7 +30,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
if size == TombstoneFileSize {
size = 0
}
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
+ if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
}
return
@@ -54,7 +55,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
-func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
+func verifyNeedleIntegrity(datFile backend.DataStorageBackend, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
n := new(needle.Needle)
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, err
diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go
index ef58e5871..b27a62990 100644
--- a/weed/storage/volume_create.go
+++ b/weed/storage/volume_create.go
@@ -6,12 +6,13 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
- return file, e
+ return backend.NewDiskFile(file), e
}
diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go
index d9dfc3862..e3305d991 100644
--- a/weed/storage/volume_create_linux.go
+++ b/weed/storage/volume_create_linux.go
@@ -7,13 +7,14 @@ import (
"syscall"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
- return file, e
+ return backend.NewDiskFile(file), e
}
diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go
index 12826f613..81536810b 100644
--- a/weed/storage/volume_create_windows.go
+++ b/weed/storage/volume_create_windows.go
@@ -3,36 +3,26 @@
package storage
import (
- "os"
-
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"golang.org/x/sys/windows"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) {
-
- mMap, exists := memory_map.FileMemoryMap[fileName]
- if !exists {
-
- if preallocate > 0 {
- glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
- }
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
- if memoryMapSizeMB > 0 {
- file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
- memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap)
+ if preallocate > 0 {
+ glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
+ }
- new_mMap := memory_map.FileMemoryMap[fileName]
- new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB))
- return file, e
- } else {
- file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
- return file, e
- }
+ if memoryMapSizeMB > 0 {
+ file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
+ return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e
} else {
- return mMap.File, nil
+ file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
+ return backend.NewDiskFile(file), e
}
+
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index be58588f2..6f1d8fe40 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/syndtr/goleveldb/leveldb/opt"
@@ -25,24 +26,27 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
fileName := v.FileName()
alreadyHasSuperBlock := false
+ // open dat file
if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
+ var dataFile *os.File
if canWrite {
- v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
- v.dataFile, e = os.Open(fileName + ".dat")
+ dataFile, e = os.Open(fileName + ".dat")
v.readOnly = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
if fileSize >= _SuperBlockSize {
alreadyHasSuperBlock = true
}
+ v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
- v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
+ v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
} else {
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 7c9539021..d9b79795b 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -26,7 +26,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
nv, ok := v.nm.Get(n.Id)
if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize {
oldNeedle := new(needle.Needle)
- err := oldNeedle.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
+ err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
if err != nil {
glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err)
return false
@@ -45,12 +45,6 @@ func (v *Volume) Destroy() (err error) {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}
- mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
- if exists {
- mMap.DeleteFileAndMemoryMap()
- delete(memory_map.FileMemoryMap, v.dataFile.Name())
- }
-
v.Close()
os.Remove(v.FileName() + ".dat")
os.Remove(v.FileName() + ".idx")
@@ -64,7 +58,7 @@ func (v *Volume) Destroy() (err error) {
func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
- err = fmt.Errorf("%s is read-only", v.dataFile.Name())
+ err = fmt.Errorf("%s is read-only", v.DataBackend.String())
return
}
v.dataFileAccessLock.Lock()
@@ -83,7 +77,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
// check whether existing needle cookie matches
nv, ok := v.nm.Get(n.Id)
if ok {
- existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset())
+ existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset())
if existingNeedleReadErr != nil {
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
return
@@ -97,7 +91,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
// append to dat file
n.AppendAtNs = uint64(time.Now().UnixNano())
- if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
+ if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
return
}
v.lastAppendAtNs = n.AppendAtNs
@@ -117,7 +111,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
- return 0, fmt.Errorf("%s is read-only", v.dataFile.Name())
+ return 0, fmt.Errorf("%s is read-only", v.DataBackend.String())
}
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@@ -127,7 +121,7 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
size := nv.Size
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, _, _, err := n.Append(v.dataFile, v.Version())
+ offset, _, _, err := n.Append(v.DataBackend, v.Version())
if err != nil {
return size, err
}
@@ -155,7 +149,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
if nv.Size == 0 {
return 0, nil
}
- err := n.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
+ err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
if err != nil {
return 0, err
}
@@ -198,21 +192,21 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
offset := int64(v.SuperBlock.BlockSize())
- return ScanVolumeFileFrom(version, v.dataFile, offset, volumeFileScanner)
+ return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
}
-func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
- n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
+func ScanVolumeFileFrom(version needle.Version, datBackend backend.DataStorageBackend, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
+ n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
if e != nil {
if e == io.EOF {
return nil
}
- return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e)
+ return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.String(), offset, e)
}
for n != nil {
var needleBody []byte
if volumeFileScanner.ReadNeedleBody() {
- if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
+ if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
@@ -228,7 +222,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
}
offset += NeedleHeaderSize + rest
glog.V(4).Infof("==> new entry offset %d", offset)
- if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil {
+ if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
if err == io.EOF {
return nil
}
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index cb34a2347..bce5af465 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -4,10 +4,9 @@ import (
"fmt"
"os"
- "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -77,56 +76,40 @@ func (s *SuperBlock) Initialized() bool {
func (v *Volume) maybeWriteSuperBlock() error {
- mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()]
- if exists {
- if mMap.End_of_file == -1 {
- v.SuperBlock.version = needle.CurrentVersion
- mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes())
- }
- return nil
- } else {
- stat, e := v.dataFile.Stat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e)
- return e
- }
- if stat.Size() == 0 {
- v.SuperBlock.version = needle.CurrentVersion
- _, e = v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0)
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
- if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
- v.readOnly = false
- }
+ datSize, _, e := v.DataBackend.GetStat()
+ if e != nil {
+ glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
+ return e
+ }
+ if datSize == 0 {
+ v.SuperBlock.version = needle.CurrentVersion
+ _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ var dataFile *os.File
+ if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
+ v.DataBackend = backend.NewDiskFile(dataFile)
+ if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
+ v.readOnly = false
}
}
}
- return e
}
+ return e
}
func (v *Volume) readSuperBlock() (err error) {
- v.SuperBlock, err = ReadSuperBlock(v.dataFile)
+ v.SuperBlock, err = ReadSuperBlock(v.DataBackend)
return err
}
// ReadSuperBlock reads from data file and load it into volume's super block
-func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
+func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) {
header := make([]byte, _SuperBlockSize)
- mMap, exists := memory_map.FileMemoryMap[dataFile.Name()]
- if exists {
- header, err = mMap.ReadMemory(0, _SuperBlockSize)
- if err != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), err)
- return
- }
- } else {
- if _, e := dataFile.ReadAt(header, 0); e != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e)
- return
- }
+ if _, e := datBackend.ReadAt(header, 0); e != nil {
+ err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
+ return
}
superBlock.version = needle.Version(header[0])
@@ -144,7 +127,7 @@ func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) {
superBlock.Extra = &master_pb.SuperBlockExtra{}
err = proto.Unmarshal(extraData, superBlock.Extra)
if err != nil {
- err = fmt.Errorf("cannot read volume %s super block extra: %v", dataFile.Name(), err)
+ err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.String(), err)
return
}
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 0e5130572..e90746b54 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
@@ -76,10 +77,10 @@ func (v *Volume) CommitCompact() error {
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
v.nm.Close()
- if err := v.dataFile.Close(); err != nil {
+ if err := v.DataBackend.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id)
}
- v.dataFile = nil
+ v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
var e error
@@ -131,8 +132,8 @@ func (v *Volume) cleanupCompact() error {
return nil
}
-func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) {
- superBlock, err := ReadSuperBlock(file)
+func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) {
+ superBlock, err := ReadSuperBlock(datBackend)
if err != nil {
return 0, err
}
@@ -146,7 +147,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
defer oldIdxFile.Close()
oldDatFile, err := os.Open(oldDatFileName)
- defer oldDatFile.Close()
+ oldDatBackend := backend.NewDiskFile(oldDatFile)
+ defer oldDatBackend.Close()
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
@@ -155,7 +157,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return nil
}
- oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile)
+ oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend)
if err != nil {
return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err)
}
@@ -196,7 +198,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil {
return fmt.Errorf("open dat file %s failed: %v", newDatFileName, err)
}
- defer dst.Close()
+ dstDatBackend := backend.NewDiskFile(dst)
+ defer dstDatBackend.Close()
if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil {
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
@@ -204,7 +207,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
defer idx.Close()
var newDatCompactRevision uint16
- newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst)
+ newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dstDatBackend)
if err != nil {
return fmt.Errorf("fetchCompactRevisionFromDatFile dst %s failed: %v", dst.Name(), err)
}
@@ -235,7 +238,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
//even the needle cache in memory is hit, the need_bytes is correct
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
var needleBytes []byte
- needleBytes, err = needle.ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
+ needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
if err != nil {
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err)
}
@@ -247,7 +250,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
fakeDelNeedle.Id = key
fakeDelNeedle.Cookie = 0x12345678
fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano())
- _, _, _, err = fakeDelNeedle.Append(dst, v.Version())
+ _, _, _, err = fakeDelNeedle.Append(dstDatBackend, v.Version())
if err != nil {
return fmt.Errorf("append deleted %d failed: %v", key, err)
}
@@ -267,7 +270,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
type VolumeFileScanner4Vacuum struct {
version needle.Version
v *Volume
- dst *os.File
+ dstBackend backend.DataStorageBackend
nm *NeedleMap
newOffset int64
now uint64
@@ -277,7 +280,7 @@ type VolumeFileScanner4Vacuum struct {
func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
scanner.version = superBlock.Version()
superBlock.CompactionRevision++
- _, err := scanner.dst.Write(superBlock.Bytes())
+ _, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0)
scanner.newOffset = int64(superBlock.BlockSize())
return err
@@ -296,7 +299,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
- if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil {
+ if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
delta := n.DiskSize(scanner.version)
@@ -309,9 +312,10 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
- dst, idx *os.File
+ dst backend.DataStorageBackend
+ idx *os.File
)
- if dst, err = createVolumeFile(dstName, preallocate, v.MemoryMapMaxSizeMb); err != nil {
+ if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dst.Close()
@@ -325,7 +329,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
v: v,
now: uint64(time.Now().Unix()),
nm: NewBtreeNeedleMap(idx),
- dst: dst,
+ dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
@@ -339,7 +343,8 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
}
- defer dst.Close()
+ dstDatBackend := backend.NewDiskFile(dst)
+ defer dstDatBackend.Close()
if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
return
@@ -369,7 +374,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
}
n := new(needle.Needle)
- err := n.ReadData(v.dataFile, offset.ToAcutalOffset(), size, v.Version())
+ err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version())
if err != nil {
return nil
}
@@ -383,7 +388,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
- if _, _, _, err = n.Append(dst, v.Version()); err != nil {
+ if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
newOffset += n.DiskSize(v.Version())