diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/backup.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy_incremental.go | 8 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 2 | ||||
| -rw-r--r-- | weed/storage/backend/backend.go | 15 | ||||
| -rw-r--r-- | weed/storage/backend/disk_file.go | 50 | ||||
| -rw-r--r-- | weed/storage/backend/memory_map/memory_map.go | 2 | ||||
| -rw-r--r-- | weed/storage/backend/memory_map/memory_map_backend.go | 60 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_write.go | 73 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_write_test.go | 6 | ||||
| -rw-r--r-- | weed/storage/store.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume.go | 27 | ||||
| -rw-r--r-- | weed/storage/volume_backup.go | 17 | ||||
| -rw-r--r-- | weed/storage/volume_checking.go | 5 | ||||
| -rw-r--r-- | weed/storage/volume_create.go | 5 | ||||
| -rw-r--r-- | weed/storage/volume_create_linux.go | 5 | ||||
| -rw-r--r-- | weed/storage/volume_create_windows.go | 32 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 10 | ||||
| -rw-r--r-- | weed/storage/volume_read_write.go | 34 | ||||
| -rw-r--r-- | weed/storage/volume_super_block.go | 63 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 43 |
20 files changed, 272 insertions, 191 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index 505de4ae6..cef2bbe3a 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -128,7 +128,7 @@ func runBackup(cmd *Command, args []string) bool { return true } v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision) - v.DataFile().WriteAt(v.SuperBlock.Bytes(), 0) + v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) } datSize, _, _ := v.FileStat() diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go index f56fbeef4..6c5bb8a62 100644 --- a/weed/server/volume_grpc_copy_incremental.go +++ b/weed/server/volume_grpc_copy_incremental.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "io" - "os" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) @@ -30,7 +30,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem startOffset := foundOffset.ToAcutalOffset() buf := make([]byte, 1024*1024*2) - return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream) + return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream) } @@ -47,10 +47,10 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server } -func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { +func sendFileContent(datBackend backend.DataStorageBackend, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { var blockSizeLimit = int64(len(buf)) for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit { - n, readErr := datFile.ReadAt(buf, startOffset+i) + n, readErr := datBackend.ReadAt(buf, startOffset+i) if readErr == nil || readErr == io.EOF { resp := &volume_server_pb.VolumeIncrementalCopyResponse{} resp.FileContent = buf[:int64(n)] diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index cb0d320ad..1bf61e1c7 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -71,7 +71,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe stream: stream, } - err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner) + err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner) return scanner.lastProcessedTimestampNs, err diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go new file mode 100644 index 000000000..ae0f84216 --- /dev/null +++ b/weed/storage/backend/backend.go @@ -0,0 +1,15 @@ +package backend + +import ( + "io" + "time" +) + +type DataStorageBackend interface { + io.ReaderAt + io.WriterAt + Truncate(off int64) error + io.Closer + GetStat() (datSize int64, modTime time.Time, err error) + String() string +} diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go new file mode 100644 index 000000000..7f2b39d15 --- /dev/null +++ b/weed/storage/backend/disk_file.go @@ -0,0 +1,50 @@ +package backend + +import ( + "os" + "time" +) + +var ( + _ DataStorageBackend = &DiskFile{} +) + +type DiskFile struct { + File *os.File + fullFilePath string +} + +func NewDiskFile(f *os.File) *DiskFile { + return &DiskFile{ + fullFilePath: f.Name(), + File: f, + } +} + +func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) { + return df.File.ReadAt(p, off) +} + +func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { + return df.File.WriteAt(p, off) +} + +func (df *DiskFile) Truncate(off int64) error { + return df.File.Truncate(off) +} + +func (df *DiskFile) Close() error { + return df.File.Close() +} + +func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { + stat, e := df.File.Stat() + if e == nil { + return stat.Size(), stat.ModTime(), nil + } + return 0, time.Time{}, err +} + +func (df *DiskFile) String() string { + return df.fullFilePath +} diff --git a/weed/storage/backend/memory_map/memory_map.go b/weed/storage/backend/memory_map/memory_map.go index e940fcc0e..5dc7ba33d 100644 --- a/weed/storage/backend/memory_map/memory_map.go +++ b/weed/storage/backend/memory_map/memory_map.go @@ -21,8 +21,6 @@ type MemoryMap struct { End_of_file int64 } -var FileMemoryMap = make(map[string]*MemoryMap) - func ReadMemoryMapMaxSizeMb(memoryMapMaxSizeMbString string) (uint32, error) { if memoryMapMaxSizeMbString == "" { return 0, nil diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go new file mode 100644 index 000000000..d999b917e --- /dev/null +++ b/weed/storage/backend/memory_map/memory_map_backend.go @@ -0,0 +1,60 @@ +package memory_map + +import ( + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/storage/backend" +) + +var ( + _ backend.DataStorageBackend = &MemoryMappedFile{} +) + +type MemoryMappedFile struct { + mm *MemoryMap +} + +func NewMemoryMappedFile(f *os.File, memoryMapSizeMB uint32) *MemoryMappedFile { + mmf := &MemoryMappedFile{ + mm : new(MemoryMap), + } + mmf.mm.CreateMemoryMap(f, 1024*1024*uint64(memoryMapSizeMB)) + return mmf +} + +func (mmf *MemoryMappedFile) ReadAt(p []byte, off int64) (n int, err error) { + readBytes, e := mmf.mm.ReadMemory(uint64(off), uint64(len(p))) + if e != nil { + return 0, e + } + // TODO avoid the extra copy + copy(p, readBytes) + return len(readBytes), nil +} + +func (mmf *MemoryMappedFile) WriteAt(p []byte, off int64) (n int, err error) { + mmf.mm.WriteMemory(uint64(off), uint64(len(p)), p) + return len(p), nil +} + +func (mmf *MemoryMappedFile) Truncate(off int64) error { + return nil +} + +func (mmf *MemoryMappedFile) Close() error { + mmf.mm.DeleteFileAndMemoryMap() + return nil +} + +func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err error) { + stat, e := mmf.mm.File.Stat() + if e == nil { + return stat.Size(), stat.ModTime(), nil + } + return 0, time.Time{}, err +} + +func (mmf *MemoryMappedFile) String() string { + return mmf.mm.File.Name() +} diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 04308a8a7..8e5d18b1a 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -4,12 +4,10 @@ import ( "errors" "fmt" "io" - "os" - "math" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" + "github.com/chrislusf/seaweedfs/weed/storage/backend" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -127,53 +125,39 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func (n *Needle) Append(w *os.File, version Version) (offset uint64, size uint32, actualSize int64, err error) { +func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) { - mMap, exists := memory_map.FileMemoryMap[w.Name()] - if !exists { - if end, e := w.Seek(0, io.SeekEnd); e == nil { - defer func(w *os.File, off int64) { - if err != nil { - if te := w.Truncate(end); te != nil { - glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) - } + if end, _, e := w.GetStat(); e == nil { + defer func(w backend.DataStorageBackend, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te) } - }(w, end) - offset = uint64(end) - } else { - err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) - return - } + } + }(w, end) + offset = uint64(end) } else { - offset = uint64(mMap.End_of_file + 1) + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return } bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version) if err == nil { - if exists { - mMap.WriteMemory(offset, uint64(len(bytesToWrite)), bytesToWrite) - } else { - _, err = w.WriteAt(bytesToWrite, int64(offset)) - } + _, err = w.WriteAt(bytesToWrite, int64(offset)) } return offset, size, actualSize, err } -func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) { +func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, version Version) (dataSlice []byte, err error) { dataSize := GetActualSize(size, version) dataSlice = make([]byte, int(dataSize)) - mMap, exists := memory_map.FileMemoryMap[r.Name()] - if exists { - dataSlice, err := mMap.ReadMemory(uint64(offset), uint64(dataSize)) - return dataSlice, err - } else { - _, err = r.ReadAt(dataSlice, offset) - return dataSlice, err - } + _, err = r.ReadAt(dataSlice, offset) + return dataSlice, err + } // ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set. @@ -207,7 +191,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers } // ReadData hydrates the needle from the file, with only n.Id is set. -func (n *Needle) ReadData(r *os.File, offset int64, size uint32, version Version) (err error) { +func (n *Needle) ReadData(r backend.DataStorageBackend, offset int64, size uint32, version Version) (err error) { bytes, err := ReadNeedleBlob(r, offset, size, version) if err != nil { return err @@ -282,24 +266,17 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { return nil } -func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) { +func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) { n = new(Needle) if version == Version1 || version == Version2 || version == Version3 { bytes = make([]byte, NeedleHeaderSize) - mMap, exists := memory_map.FileMemoryMap[r.Name()] - if exists { - bytes, err = mMap.ReadMemory(uint64(offset), NeedleHeaderSize) - if err != nil { - return nil, bytes, 0, err - } - } else { - var count int - count, err = r.ReadAt(bytes, offset) - if count <= 0 || err != nil { - return nil, bytes, 0, err - } + var count int + count, err = r.ReadAt(bytes, offset) + if count <= 0 || err != nil { + return nil, bytes, 0, err } + n.ParseNeedleHeader(bytes) bodyLength = NeedleBodyLength(n.Size, version) } @@ -324,7 +301,7 @@ func NeedleBodyLength(needleSize uint32, version Version) int64 { //n should be a needle already read the header //the input stream will read until next file entry -func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyLength int64) (bytes []byte, err error) { +func (n *Needle) ReadNeedleBody(r backend.DataStorageBackend, version Version, offset int64, bodyLength int64) (bytes []byte, err error) { if bodyLength <= 0 { return nil, nil diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go index 4c507f9e6..47582dd26 100644 --- a/weed/storage/needle/needle_read_write_test.go +++ b/weed/storage/needle/needle_read_write_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -54,7 +55,10 @@ func TestAppend(t *testing.T) { os.Remove(tempFile.Name()) }() - offset, _, _, _ := n.Append(tempFile, CurrentVersion) + datBackend := backend.NewDiskFile(tempFile) + defer datBackend.Close() + + offset, _, _, _ := n.Append(datBackend, CurrentVersion) if offset != uint64(fileSize) { t.Errorf("Fail to Append Needle.") } diff --git a/weed/storage/store.go b/weed/storage/store.go index 1d909c23a..4d1061bed 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -101,14 +101,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, MemoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.FindFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, MemoryMapMaxSizeMb); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 6084b4df0..e85696eab 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -5,10 +5,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" - "os" "path" "strconv" "sync" @@ -21,7 +21,7 @@ type Volume struct { Id needle.VolumeId dir string Collection string - dataFile *os.File + DataBackend backend.DataStorageBackend nm NeedleMapper needleMapKind NeedleMapType readOnly bool @@ -39,16 +39,16 @@ type Volume struct { isCompacting bool } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, MemoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: MemoryMapMaxSizeMb} + v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) return } func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) + return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.readOnly) } func VolumeFileName(dir string, collection string, id int) (fileName string) { @@ -63,9 +63,6 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) { func (v *Volume) FileName() (fileName string) { return VolumeFileName(v.dir, v.Collection, int(v.Id)) } -func (v *Volume) DataFile() *os.File { - return v.dataFile -} func (v *Volume) Version() needle.Version { return v.SuperBlock.Version() @@ -75,15 +72,15 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() - if v.dataFile == nil { + if v.DataBackend == nil { return } - stat, e := v.dataFile.Stat() + datFileSize, modTime, e := v.DataBackend.GetStat() if e == nil { - return uint64(stat.Size()), v.nm.IndexFileSize(), stat.ModTime() + return uint64(datFileSize), v.nm.IndexFileSize(), modTime } - glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) + glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.String(), e) return // -1 causes integer overflow and the volume to become unwritable. } @@ -149,9 +146,9 @@ func (v *Volume) Close() { v.nm.Close() v.nm = nil } - if v.dataFile != nil { - _ = v.dataFile.Close() - v.dataFile = nil + if v.DataBackend != nil { + _ = v.DataBackend.Close() + v.DataBackend = nil stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() } } diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index f48ccbb68..fe0506917 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -19,8 +19,8 @@ func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusRespons defer v.dataFileAccessLock.Unlock() var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} - if stat, err := v.dataFile.Stat(); err == nil { - syncStatus.TailOffset = uint64(stat.Size()) + if datSize, _, err := v.DataBackend.GetStat(); err == nil { + syncStatus.TailOffset = uint64(datSize) } syncStatus.Collection = v.Collection syncStatus.IdxFileSize = v.nm.IndexFileSize() @@ -70,6 +70,8 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial return err } + writeOffset := int64(startFromOffset) + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{ @@ -80,8 +82,6 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial return err } - v.dataFile.Seek(int64(startFromOffset), io.SeekStart) - for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -92,10 +92,11 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial } } - _, writeErr := v.dataFile.Write(resp.FileContent) + n, writeErr := v.DataBackend.WriteAt(resp.FileContent, writeOffset) if writeErr != nil { return writeErr } + writeOffset += int64(n) } return nil @@ -107,7 +108,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial } // add to needle map - return ScanVolumeFileFrom(v.version, v.dataFile, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v}) + return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v}) } @@ -153,11 +154,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := needle.ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()) if err != nil { return 0, fmt.Errorf("ReadNeedleHeader: %v", err) } - _, err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) if err != nil { return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) } diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 8f930546f..61b59e9f7 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -29,7 +30,7 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin if size == TombstoneFileSize { size = 0 } - if lastAppendAtNs, e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { + if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) } return @@ -54,7 +55,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } -func verifyNeedleIntegrity(datFile *os.File, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) { +func verifyNeedleIntegrity(datFile backend.DataStorageBackend, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) { n := new(needle.Needle) if err = n.ReadData(datFile, offset, size, v); err != nil { return n.AppendAtNs, err diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go index ef58e5871..b27a62990 100644 --- a/weed/storage/volume_create.go +++ b/weed/storage/volume_create.go @@ -6,12 +6,13 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } - return file, e + return backend.NewDiskFile(file), e } diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go index d9dfc3862..e3305d991 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/volume_create_linux.go @@ -7,13 +7,14 @@ import ( "syscall" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) } - return file, e + return backend.NewDiskFile(file), e } diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go index 12826f613..81536810b 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/volume_create_windows.go @@ -3,36 +3,26 @@ package storage import ( - "os" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "golang.org/x/sys/windows" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (*os.File, error) { - - mMap, exists := memory_map.FileMemoryMap[fileName] - if !exists { - - if preallocate > 0 { - glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) - } +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { - if memoryMapSizeMB > 0 { - file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) - memory_map.FileMemoryMap[fileName] = new(memory_map.MemoryMap) + if preallocate > 0 { + glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) + } - new_mMap := memory_map.FileMemoryMap[fileName] - new_mMap.CreateMemoryMap(file, 1024*1024*uint64(memoryMapSizeMB)) - return file, e - } else { - file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) - return file, e - } + if memoryMapSizeMB > 0 { + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) + return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e } else { - return mMap.File, nil + file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) + return backend.NewDiskFile(file), e } + } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index be58588f2..6f1d8fe40 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/syndtr/goleveldb/leveldb/opt" @@ -25,24 +26,27 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind fileName := v.FileName() alreadyHasSuperBlock := false + // open dat file if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } + var dataFile *os.File if canWrite { - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - v.dataFile, e = os.Open(fileName + ".dat") + dataFile, e = os.Open(fileName + ".dat") v.readOnly = true } v.lastModifiedTsSeconds = uint64(modifiedTime.Unix()) if fileSize >= _SuperBlockSize { alreadyHasSuperBlock = true } + v.DataBackend = backend.NewDiskFile(dataFile) } else { if createDatIfMissing { - v.dataFile, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) + v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) } else { return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 7c9539021..d9b79795b 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -9,7 +9,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -26,7 +26,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { oldNeedle := new(needle.Needle) - err := oldNeedle.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) + err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err) return false @@ -45,12 +45,6 @@ func (v *Volume) Destroy() (err error) { err = fmt.Errorf("volume %d is compacting", v.Id) return } - mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()] - if exists { - mMap.DeleteFileAndMemoryMap() - delete(memory_map.FileMemoryMap, v.dataFile.Name()) - } - v.Close() os.Remove(v.FileName() + ".dat") os.Remove(v.FileName() + ".idx") @@ -64,7 +58,7 @@ func (v *Volume) Destroy() (err error) { func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { - err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + err = fmt.Errorf("%s is read-only", v.DataBackend.String()) return } v.dataFileAccessLock.Lock() @@ -83,7 +77,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset()) + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) if existingNeedleReadErr != nil { err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return @@ -97,7 +91,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) - if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil { + if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { return } v.lastAppendAtNs = n.AppendAtNs @@ -117,7 +111,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { - return 0, fmt.Errorf("%s is read-only", v.dataFile.Name()) + return 0, fmt.Errorf("%s is read-only", v.DataBackend.String()) } v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -127,7 +121,7 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { size := nv.Size n.Data = nil n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, _, _, err := n.Append(v.dataFile, v.Version()) + offset, _, _, err := n.Append(v.DataBackend, v.Version()) if err != nil { return size, err } @@ -155,7 +149,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) { if nv.Size == 0 { return 0, nil } - err := n.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) + err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { return 0, err } @@ -198,21 +192,21 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, offset := int64(v.SuperBlock.BlockSize()) - return ScanVolumeFileFrom(version, v.dataFile, offset, volumeFileScanner) + return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) } -func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) +func ScanVolumeFileFrom(version needle.Version, datBackend backend.DataStorageBackend, offset int64, volumeFileScanner VolumeFileScanner) (err error) { + n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) if e != nil { if e == io.EOF { return nil } - return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e) + return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.String(), offset, e) } for n != nil { var needleBody []byte if volumeFileScanner.ReadNeedleBody() { - if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil { + if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { glog.V(0).Infof("cannot read needle body: %v", err) //err = fmt.Errorf("cannot read needle body: %v", err) //return @@ -228,7 +222,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, } offset += NeedleHeaderSize + rest glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil { + if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { if err == io.EOF { return nil } diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index cb34a2347..bce5af465 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -4,10 +4,9 @@ import ( "fmt" "os" - "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -77,56 +76,40 @@ func (s *SuperBlock) Initialized() bool { func (v *Volume) maybeWriteSuperBlock() error { - mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()] - if exists { - if mMap.End_of_file == -1 { - v.SuperBlock.version = needle.CurrentVersion - mMap.WriteMemory(0, uint64(len(v.SuperBlock.Bytes())), v.SuperBlock.Bytes()) - } - return nil - } else { - stat, e := v.dataFile.Stat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %v", v.dataFile.Name(), e) - return e - } - if stat.Size() == 0 { - v.SuperBlock.version = needle.CurrentVersion - _, e = v.dataFile.WriteAt(v.SuperBlock.Bytes(), 0) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { - if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { - v.readOnly = false - } + datSize, _, e := v.DataBackend.GetStat() + if e != nil { + glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e) + return e + } + if datSize == 0 { + v.SuperBlock.version = needle.CurrentVersion + _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + var dataFile *os.File + if dataFile, e = os.Create(v.DataBackend.String()); e == nil { + v.DataBackend = backend.NewDiskFile(dataFile) + if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil { + v.readOnly = false } } } - return e } + return e } func (v *Volume) readSuperBlock() (err error) { - v.SuperBlock, err = ReadSuperBlock(v.dataFile) + v.SuperBlock, err = ReadSuperBlock(v.DataBackend) return err } // ReadSuperBlock reads from data file and load it into volume's super block -func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) { +func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) { header := make([]byte, _SuperBlockSize) - mMap, exists := memory_map.FileMemoryMap[dataFile.Name()] - if exists { - header, err = mMap.ReadMemory(0, _SuperBlockSize) - if err != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), err) - return - } - } else { - if _, e := dataFile.ReadAt(header, 0); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", dataFile.Name(), e) - return - } + if _, e := datBackend.ReadAt(header, 0); e != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e) + return } superBlock.version = needle.Version(header[0]) @@ -144,7 +127,7 @@ func ReadSuperBlock(dataFile *os.File) (superBlock SuperBlock, err error) { superBlock.Extra = &master_pb.SuperBlockExtra{} err = proto.Unmarshal(extraData, superBlock.Extra) if err != nil { - err = fmt.Errorf("cannot read volume %s super block extra: %v", dataFile.Name(), err) + err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.String(), err) return } } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 0e5130572..e90746b54 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" @@ -76,10 +77,10 @@ func (v *Volume) CommitCompact() error { glog.V(3).Infof("Got volume %d committing lock...", v.Id) v.nm.Close() - if err := v.dataFile.Close(); err != nil { + if err := v.DataBackend.Close(); err != nil { glog.V(0).Infof("fail to close volume %d", v.Id) } - v.dataFile = nil + v.DataBackend = nil stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() var e error @@ -131,8 +132,8 @@ func (v *Volume) cleanupCompact() error { return nil } -func fetchCompactRevisionFromDatFile(file *os.File) (compactRevision uint16, err error) { - superBlock, err := ReadSuperBlock(file) +func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) { + superBlock, err := ReadSuperBlock(datBackend) if err != nil { return 0, err } @@ -146,7 +147,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI defer oldIdxFile.Close() oldDatFile, err := os.Open(oldDatFileName) - defer oldDatFile.Close() + oldDatBackend := backend.NewDiskFile(oldDatFile) + defer oldDatBackend.Close() if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) @@ -155,7 +157,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return nil } - oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatFile) + oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend) if err != nil { return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err) } @@ -196,7 +198,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if dst, err = os.OpenFile(newDatFileName, os.O_RDWR, 0644); err != nil { return fmt.Errorf("open dat file %s failed: %v", newDatFileName, err) } - defer dst.Close() + dstDatBackend := backend.NewDiskFile(dst) + defer dstDatBackend.Close() if idx, err = os.OpenFile(newIdxFileName, os.O_RDWR, 0644); err != nil { return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err) @@ -204,7 +207,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI defer idx.Close() var newDatCompactRevision uint16 - newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dst) + newDatCompactRevision, err = fetchCompactRevisionFromDatFile(dstDatBackend) if err != nil { return fmt.Errorf("fetchCompactRevisionFromDatFile dst %s failed: %v", dst.Name(), err) } @@ -235,7 +238,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI //even the needle cache in memory is hit, the need_bytes is correct glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) var needleBytes []byte - needleBytes, err = needle.ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) + needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) if err != nil { return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) } @@ -247,7 +250,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI fakeDelNeedle.Id = key fakeDelNeedle.Cookie = 0x12345678 fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano()) - _, _, _, err = fakeDelNeedle.Append(dst, v.Version()) + _, _, _, err = fakeDelNeedle.Append(dstDatBackend, v.Version()) if err != nil { return fmt.Errorf("append deleted %d failed: %v", key, err) } @@ -267,7 +270,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI type VolumeFileScanner4Vacuum struct { version needle.Version v *Volume - dst *os.File + dstBackend backend.DataStorageBackend nm *NeedleMap newOffset int64 now uint64 @@ -277,7 +280,7 @@ type VolumeFileScanner4Vacuum struct { func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error { scanner.version = superBlock.Version() superBlock.CompactionRevision++ - _, err := scanner.dst.Write(superBlock.Bytes()) + _, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0) scanner.newOffset = int64(superBlock.BlockSize()) return err @@ -296,7 +299,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil { + if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } delta := n.DiskSize(scanner.version) @@ -309,9 +312,10 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { var ( - dst, idx *os.File + dst backend.DataStorageBackend + idx *os.File ) - if dst, err = createVolumeFile(dstName, preallocate, v.MemoryMapMaxSizeMb); err != nil { + if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil { return } defer dst.Close() @@ -325,7 +329,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca v: v, now: uint64(time.Now().Unix()), nm: NewBtreeNeedleMap(idx), - dst: dst, + dstBackend: dst, writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) @@ -339,7 +343,8 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { return } - defer dst.Close() + dstDatBackend := backend.NewDiskFile(dst) + defer dstDatBackend.Close() if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { return @@ -369,7 +374,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { } n := new(needle.Needle) - err := n.ReadData(v.dataFile, offset.ToAcutalOffset(), size, v.Version()) + err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version()) if err != nil { return nil } @@ -383,7 +388,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, _, _, err = n.Append(dst, v.Version()); err != nil { + if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { return fmt.Errorf("cannot append needle: %s", err) } newOffset += n.DiskSize(v.Version()) |
