aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-02-23 13:41:30 +0800
committerGitHub <noreply@github.com>2021-02-23 13:41:30 +0800
commit620b91f23eaf5718088dc9ddcf91540967d0c8a6 (patch)
tree04e92a8f92b548e26080040d009f23a51d9cc521 /weed/storage
parent690d7c10b826b53bf823faef76603cd6ad83aa1d (diff)
parent90cdf9dcace5595b31104df3a3b7e4038a7db341 (diff)
downloadseaweedfs-620b91f23eaf5718088dc9ddcf91540967d0c8a6.tar.xz
seaweedfs-620b91f23eaf5718088dc9ddcf91540967d0c8a6.zip
Merge pull request #73 from chrislusf/master
sync
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/backend/disk_file.go42
-rw-r--r--weed/storage/disk_location.go13
-rw-r--r--weed/storage/disk_location_ec.go6
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go2
-rw-r--r--weed/storage/erasure_coding/ec_shard.go6
-rw-r--r--weed/storage/erasure_coding/ec_test.go4
-rw-r--r--weed/storage/erasure_coding/ec_volume.go8
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go6
-rw-r--r--weed/storage/erasure_coding/ec_volume_test.go6
-rw-r--r--weed/storage/needle/needle_read_write.go10
-rw-r--r--weed/storage/needle/needle_read_write_test.go2
-rw-r--r--weed/storage/needle_map.go13
-rw-r--r--weed/storage/needle_map_leveldb.go5
-rw-r--r--weed/storage/needle_map_memory.go5
-rw-r--r--weed/storage/store.go53
-rw-r--r--weed/storage/store_ec.go6
-rw-r--r--weed/storage/super_block/replica_placement.go6
-rw-r--r--weed/storage/types/offset_4bytes.go2
-rw-r--r--weed/storage/types/offset_5bytes.go2
-rw-r--r--weed/storage/types/volume_disk_type.go40
-rw-r--r--weed/storage/volume.go9
-rw-r--r--weed/storage/volume_backup.go8
-rw-r--r--weed/storage/volume_checking.go18
-rw-r--r--weed/storage/volume_info.go4
-rw-r--r--weed/storage/volume_loading.go8
-rw-r--r--weed/storage/volume_read_write.go18
-rw-r--r--weed/storage/volume_vacuum.go12
27 files changed, 226 insertions, 88 deletions
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 2b04c8df2..498963c31 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -1,6 +1,8 @@
package backend
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"time"
)
@@ -12,12 +14,25 @@ var (
type DiskFile struct {
File *os.File
fullFilePath string
+ fileSize int64
+ modTime time.Time
}
func NewDiskFile(f *os.File) *DiskFile {
+ stat, err := f.Stat()
+ if err != nil {
+ glog.Fatalf("stat file %s: %v", f.Name(), err)
+ }
+ offset := stat.Size()
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ }
+
return &DiskFile{
fullFilePath: f.Name(),
File: f,
+ fileSize: offset,
+ modTime: stat.ModTime(),
}
}
@@ -26,11 +41,28 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
}
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
- return df.File.WriteAt(p, off)
+ n, err = df.File.WriteAt(p, off)
+ if err == nil {
+ waterMark := off + int64(n)
+ if waterMark > df.fileSize {
+ df.fileSize = waterMark
+ df.modTime = time.Now()
+ }
+ }
+ return
+}
+
+func (df *DiskFile) Append(p []byte) (n int, err error) {
+ return df.WriteAt(p, df.fileSize)
}
func (df *DiskFile) Truncate(off int64) error {
- return df.File.Truncate(off)
+ err := df.File.Truncate(off)
+ if err == nil {
+ df.fileSize = off
+ df.modTime = time.Now()
+ }
+ return err
}
func (df *DiskFile) Close() error {
@@ -38,11 +70,7 @@ func (df *DiskFile) Close() error {
}
func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
- stat, e := df.File.Stat()
- if e == nil {
- return stat.Size(), stat.ModTime(), nil
- }
- return 0, time.Time{}, err
+ return df.fileSize, df.modTime, nil
}
func (df *DiskFile) Name() string {
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 9b2ab69fe..6de87c793 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io/ioutil"
"os"
"path/filepath"
@@ -19,6 +20,7 @@ import (
type DiskLocation struct {
Directory string
IdxDirectory string
+ DiskType types.DiskType
MaxVolumeCount int
OriginalMaxVolumeCount int
MinFreeSpacePercent float32
@@ -32,7 +34,7 @@ type DiskLocation struct {
isDiskSpaceLow bool
}
-func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation {
+func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation {
dir = util.ResolvePath(dir)
if idxDir == "" {
idxDir = dir
@@ -42,6 +44,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
location := &DiskLocation{
Directory: dir,
IdxDirectory: idxDir,
+ DiskType: diskType,
MaxVolumeCount: maxVolumeCount,
OriginalMaxVolumeCount: maxVolumeCount,
MinFreeSpacePercent: minFreeSpacePercent,
@@ -82,7 +85,7 @@ func getValidVolumeName(basename string) string {
return ""
}
-func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
+func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool {
basename := fileInfo.Name()
if fileInfo.IsDir() {
return false
@@ -133,7 +136,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
return true
}
-func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
+func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
task_queue := make(chan os.FileInfo, 10*concurrency)
go func() {
@@ -167,7 +170,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
}
-func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
+func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
l.concurrentLoadingVolumes(needleMapKind, 10)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
@@ -237,7 +240,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro
return
}
-func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
+func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
if fileInfo, found := l.LocateVolume(vid); found {
return l.loadExistingVolume(fileInfo, needleMapKind)
}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index d1237b40f..91c7d86a6 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -14,7 +14,7 @@ import (
)
var (
- re = regexp.MustCompile("\\.ec[0-9][0-9]")
+ re = regexp.MustCompile(`\.ec[0-9][0-9]`)
)
func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
@@ -57,7 +57,7 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) {
- ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId)
+ ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId)
if err != nil {
if err == os.ErrNotExist {
return os.ErrNotExist
@@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if !found {
- ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid)
+ ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid)
if err != nil {
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
}
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index bc86d9c04..47d3c6550 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -58,7 +58,7 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64,
return nil
}
- entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
+ entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
if datSize < entryStopOffset {
datSize = entryStopOffset
}
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index 74ed99198..2a57d85ef 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -2,6 +2,7 @@ package erasure_coding
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"path"
"strconv"
@@ -20,11 +21,12 @@ type EcVolumeShard struct {
dir string
ecdFile *os.File
ecdFileSize int64
+ DiskType types.DiskType
}
-func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
+func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
- v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
+ v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType}
baseFileName := v.FileName()
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 63cc2c352..0d48bec02 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -93,7 +93,7 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type
func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) {
data := make([]byte, size)
- n, err := datFile.ReadAt(data, offset.ToAcutalOffset())
+ n, err := datFile.ReadAt(data, offset.ToActualOffset())
if err != nil {
return nil, fmt.Errorf("failed to ReadAt dat file: %v", err)
}
@@ -105,7 +105,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte
func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
- intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
+ intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size)
for i, interval := range intervals {
if d, e := readOneInterval(interval, ecFiles); e != nil {
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 2183e43d6..85d6a5fc8 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -36,10 +36,11 @@ type EcVolume struct {
Version needle.Version
ecjFile *os.File
ecjFileAccessLock sync.Mutex
+ diskType types.DiskType
}
-func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
- ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid}
+func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
+ ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
@@ -191,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
+ DiskType: string(ev.diskType),
}
messages = append(messages, m)
}
@@ -211,7 +213,7 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
shard := ev.Shards[0]
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
return
}
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index 8ff65bb0f..3dd535e64 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -10,13 +10,15 @@ type EcVolumeInfo struct {
VolumeId needle.VolumeId
Collection string
ShardBits ShardBits
+ DiskType string
}
-func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
+func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
return &EcVolumeInfo{
Collection: collection,
VolumeId: vid,
ShardBits: shardBits,
+ DiskType: diskType,
}
}
@@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo {
VolumeId: ecInfo.VolumeId,
Collection: ecInfo.Collection,
ShardBits: ecInfo.ShardBits.Minus(other.ShardBits),
+ DiskType: ecInfo.DiskType,
}
return ret
@@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.
Id: uint32(ecInfo.VolumeId),
EcIndexBits: uint32(ecInfo.ShardBits),
Collection: ecInfo.Collection,
+ DiskType: ecInfo.DiskType,
}
}
diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go
index fe45bf722..747ef4aab 100644
--- a/weed/storage/erasure_coding/ec_volume_test.go
+++ b/weed/storage/erasure_coding/ec_volume_test.go
@@ -35,16 +35,16 @@ func TestPositioning(t *testing.T) {
needleId, _ := types.ParseNeedleId(test.needleId)
offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
- fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size)
+ fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
}
needleId, _ := types.ParseNeedleId("0f087622")
offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
- fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size)
+ fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
- intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
for _, interval := range intervals {
shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index e758a6fee..0f72bc0bb 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -161,7 +161,15 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
- _, err = r.ReadAt(dataSlice, offset)
+ var n int
+ n, err = r.ReadAt(dataSlice, offset)
+ if err != nil && int64(n) == dataSize {
+ err = nil
+ }
+ if err != nil {
+ fileSize, _, _ := r.GetStat()
+ println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
+ }
return dataSlice, err
}
diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go
index 47582dd26..afcea5a05 100644
--- a/weed/storage/needle/needle_read_write_test.go
+++ b/weed/storage/needle/needle_read_write_test.go
@@ -48,7 +48,7 @@ func TestAppend(t *testing.T) {
int64 : -9223372036854775808 to 9223372036854775807
*/
- fileSize := int64(4294967295) + 10000
+ fileSize := int64(4294967296) + 10000
tempFile.Truncate(fileSize)
defer func() {
tempFile.Close()
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
index 9f331267d..d35391f66 100644
--- a/weed/storage/needle_map.go
+++ b/weed/storage/needle_map.go
@@ -1,7 +1,6 @@
package storage
import (
- "fmt"
"io"
"os"
"sync"
@@ -11,10 +10,10 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
-type NeedleMapType int
+type NeedleMapKind int
const (
- NeedleMapInMemory NeedleMapType = iota
+ NeedleMapInMemory NeedleMapKind = iota
NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
@@ -41,6 +40,7 @@ type baseNeedleMapper struct {
indexFile *os.File
indexFileAccessLock sync.Mutex
+ indexFileOffset int64
}
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
@@ -56,11 +56,10 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size
nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock()
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot seek end of indexfile %s: %v",
- nm.indexFile.Name(), err)
+ written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset)
+ if err == nil {
+ nm.indexFileOffset += int64(written)
}
- _, err := nm.indexFile.Write(bytes)
return err
}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 415cd14dd..9716e9729 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -31,6 +31,11 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
generateLevelDbFile(dbFileName, indexFile)
glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
}
+ if stat, err := indexFile.Stat(); err != nil {
+ glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
+ } else {
+ m.indexFileOffset = stat.Size()
+ }
glog.V(1).Infof("Opening %s...", dbFileName)
if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index d0891dc98..1b58708c6 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -19,6 +19,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
m: needle_map.NewCompactMap(),
}
nm.indexFile = file
+ stat, err := file.Stat()
+ if err != nil {
+ glog.Fatalf("stat file %s: %v", file.Name(), err)
+ }
+ nm.indexFileOffset = stat.Size()
return nm
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index ff28be47c..47829666a 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -40,7 +40,7 @@ type Store struct {
dataCenter string // optional informaton, overwriting master setting if exists
rack string // optional information, overwriting master setting if exists
connected bool
- NeedleMapType NeedleMapType
+ NeedleMapKind NeedleMapKind
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
@@ -52,11 +52,11 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) {
- s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
+func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) {
+ s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
- location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
+ location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i])
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
-func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
+func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil {
return e
}
- e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
+ e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
@@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume {
}
return nil
}
-func (s *Store) FindFreeLocation() (ret *DiskLocation) {
+func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
+ if diskType != location.DiskType {
+ continue
+ }
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen()
@@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
- if location := s.FindFreeLocation(); location != nil {
+ if location := s.FindFreeLocation(diskType); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
@@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
ReplicaPlacement: uint32(replicaPlacement.Byte()),
Version: uint32(volume.Version()),
Ttl: ttl.ToUint32(),
+ DiskType: string(diskType),
}
return nil
} else {
@@ -169,6 +173,7 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) {
ReadOnly: v.IsReadOnly(),
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
+ DiskType: v.DiskType().String(),
}
s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
@@ -202,13 +207,13 @@ func (s *Store) GetRack() string {
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
- maxVolumeCount := 0
+ maxVolumeCounts := make(map[string]uint32)
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
- maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
+ maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount)
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
@@ -280,15 +285,15 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
return &master_pb.Heartbeat{
- Ip: s.Ip,
- Port: uint32(s.Port),
- PublicUrl: s.PublicUrl,
- MaxVolumeCount: uint32(maxVolumeCount),
- MaxFileKey: NeedleIdToUint64(maxFileKey),
- DataCenter: s.dataCenter,
- Rack: s.rack,
- Volumes: volumeMessages,
- HasNoVolumes: len(volumeMessages) == 0,
+ Ip: s.Ip,
+ Port: uint32(s.Port),
+ PublicUrl: s.PublicUrl,
+ MaxVolumeCounts: maxVolumeCounts,
+ MaxFileKey: NeedleIdToUint64(maxFileKey),
+ DataCenter: s.dataCenter,
+ Rack: s.rack,
+ Volumes: volumeMessages,
+ HasNoVolumes: len(volumeMessages) == 0,
}
}
@@ -362,7 +367,7 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
func (s *Store) MountVolume(i needle.VolumeId) error {
for _, location := range s.Locations {
- if found := location.LoadVolume(i, s.NeedleMapType); found == true {
+ if found := location.LoadVolume(i, s.NeedleMapKind); found == true {
glog.V(0).Infof("mount volume %d", i)
v := s.findVolume(i)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
@@ -371,6 +376,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.location.DiskType),
}
return nil
}
@@ -390,6 +396,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
@@ -414,6 +421,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
if err := location.DeleteVolume(i); err == nil {
@@ -463,6 +471,9 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
+ if volumeSizeLimit == 0 {
+ return
+ }
for _, diskLocation := range s.Locations {
if diskLocation.OriginalMaxVolumeCount == 0 {
currentMaxVolumeCount := diskLocation.MaxVolumeCount
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 853757ce3..a9b6a8ff3 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -58,6 +58,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
+ DiskType: string(location.DiskType),
}
return nil
} else if err == os.ErrNotExist {
@@ -82,6 +83,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
Id: uint32(vid),
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
+ DiskType: string(ecShard.DiskType),
}
for _, location := range s.Locations {
@@ -131,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, ErrorDeleted
}
- glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
+ glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
if len(intervals) > 1 {
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
@@ -144,7 +146,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, ErrorDeleted
}
- err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
+ err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("readbytes: %v", err)
}
diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go
index fcccbba7d..65ec53819 100644
--- a/weed/storage/super_block/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -6,9 +6,9 @@ import (
)
type ReplicaPlacement struct {
- SameRackCount int
- DiffRackCount int
- DiffDataCenterCount int
+ SameRackCount int `json:"node,omitempty"`
+ DiffRackCount int `json:"rack,omitempty"`
+ DiffDataCenterCount int `json:"dc,omitempty"`
}
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go
index d53147e21..5348d5b36 100644
--- a/weed/storage/types/offset_4bytes.go
+++ b/weed/storage/types/offset_4bytes.go
@@ -54,7 +54,7 @@ func ToOffset(offset int64) Offset {
return Uint32ToOffset(smaller)
}
-func (offset Offset) ToAcutalOffset() (actualOffset int64) {
+func (offset Offset) ToActualOffset() (actualOffset int64) {
return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize)
}
diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go
index 05c6d2f39..b6181fc11 100644
--- a/weed/storage/types/offset_5bytes.go
+++ b/weed/storage/types/offset_5bytes.go
@@ -71,7 +71,7 @@ func ToOffset(offset int64) Offset {
}
}
-func (offset Offset) ToAcutalOffset() (actualOffset int64) {
+func (offset Offset) ToActualOffset() (actualOffset int64) {
return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize)
}
diff --git a/weed/storage/types/volume_disk_type.go b/weed/storage/types/volume_disk_type.go
new file mode 100644
index 000000000..c9b87d802
--- /dev/null
+++ b/weed/storage/types/volume_disk_type.go
@@ -0,0 +1,40 @@
+package types
+
+import (
+ "strings"
+)
+
+type DiskType string
+
+const (
+ HardDriveType DiskType = ""
+ SsdType = "ssd"
+)
+
+func ToDiskType(vt string) (diskType DiskType) {
+ vt = strings.ToLower(vt)
+ diskType = HardDriveType
+ switch vt {
+ case "", "hdd":
+ diskType = HardDriveType
+ case "ssd":
+ diskType = SsdType
+ default:
+ diskType = DiskType(vt)
+ }
+ return
+}
+
+func (diskType DiskType) String() string {
+ if diskType == "" {
+ return ""
+ }
+ return string(diskType)
+}
+
+func (diskType DiskType) ReadableString() string {
+ if diskType == "" {
+ return "hdd"
+ }
+ return string(diskType)
+}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 80a74c3e3..366449c53 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -25,7 +25,7 @@ type Volume struct {
Collection string
DataBackend backend.BackendStorageFile
nm NeedleMapper
- needleMapKind NeedleMapType
+ needleMapKind NeedleMapKind
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteLock sync.RWMutex
@@ -50,7 +50,7 @@ type Volume struct {
lastIoError error
}
-func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
@@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
+func (v *Volume) DiskType() types.DiskType {
+ return v.location.DiskType
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
+ DiskType: string(v.location.DiskType),
}
volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 9aeb10f69..82ea12a89 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -154,13 +154,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
- n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
+ n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset())
if err != nil {
- return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err)
+ return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err)
}
- _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength)
if err != nil {
- return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
+ return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err)
}
return n.AppendAtNs, nil
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 00e04047f..b76933083 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"io"
"os"
@@ -58,7 +59,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (
return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
}
} else {
- if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
+ if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil {
return lastAppendAtNs, err
}
}
@@ -148,3 +149,18 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V
}
return n.AppendAtNs, err
}
+
+func (v *Volume) checkIdxFile() error {
+ datFileSize, _, err := v.DataBackend.GetStat()
+ if err != nil {
+ return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err)
+ }
+ if datFileSize <= super_block.SuperBlockSize {
+ return nil
+ }
+ indexFileName := v.FileName(".idx")
+ if util.FileExists(indexFileName) {
+ return nil
+ }
+ return fmt.Errorf("idx file %s does not exists", indexFileName)
+}
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
index 313818cde..9c64c9682 100644
--- a/weed/storage/volume_info.go
+++ b/weed/storage/volume_info.go
@@ -14,6 +14,7 @@ type VolumeInfo struct {
Size uint64
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
+ DiskType string
Collection string
Version needle.Version
FileCount int
@@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
ModifiedAtSecond: m.ModifiedAtSecond,
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
+ DiskType: m.DiskType,
}
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
@@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
}
vi.ReplicaPlacement = rp
vi.Ttl = needle.LoadTTLFromUint32(m.Ttl)
+ vi.DiskType = m.DiskType
return vi, nil
}
@@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe
ModifiedAtSecond: vi.ModifiedAtSecond,
RemoteStorageName: vi.RemoteStorageName,
RemoteStorageKey: vi.RemoteStorageKey,
+ DiskType: vi.DiskType,
}
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index a6efc630d..bff1055bb 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -14,7 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
+func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
@@ -22,7 +22,7 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) {
alreadyHasSuperBlock := false
hasLoadedVolume := false
@@ -96,6 +96,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.dirIdx = v.dir
}
}
+ // check volume idx files
+ if err := v.checkIdxFile(); err != nil {
+ glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
+ }
var indexFile *os.File
if v.noWriteOrDelete {
glog.V(0).Infoln("open to read file", v.FileName(".idx"))
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index f28ee50e6..07376bc88 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -41,9 +41,9 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
nv, ok := v.nm.Get(n.Id)
if ok && !nv.Offset.IsZero() && nv.Size.IsValid() {
oldNeedle := new(needle.Needle)
- err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
+ err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
if err != nil {
- glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err)
+ glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err)
return false
}
if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
@@ -113,7 +113,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
// check whether existing needle cookie matches
nv, ok := v.nm.Get(n.Id)
if ok {
- existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset())
+ existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
if existingNeedleReadErr != nil {
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
return
@@ -136,7 +136,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
v.lastAppendAtNs = n.AppendAtNs
// add to needle map
- if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
+ if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
@@ -179,7 +179,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
// check whether existing needle cookie matches
nv, ok := v.nm.Get(n.Id)
if ok {
- existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset())
+ existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
if existingNeedleReadErr != nil {
err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
return
@@ -201,7 +201,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
v.lastAppendAtNs = n.AppendAtNs
// add to needle map
- if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
+ if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
@@ -303,9 +303,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if readSize == 0 {
return 0, nil
}
- err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
+ err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
- err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
}
v.checkReadWriteError(err)
if err != nil {
@@ -410,7 +410,7 @@ type VolumeFileScanner interface {
}
func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
- needleMapKind NeedleMapType,
+ needleMapKind NeedleMapKind,
volumeFileScanner VolumeFileScanner) (err error) {
var v *Volume
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 5884eca87..0ee1e61c6 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -280,13 +280,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
//updated needle
if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() {
//even the needle cache in memory is hit, the need_bytes is correct
- glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
+ glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size)
var needleBytes []byte
- needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
+ needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version())
if err != nil {
- return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err)
+ return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
}
- dst.Write(needleBytes)
+ dstDatBackend.Append(needleBytes)
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
@@ -339,7 +339,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
}
nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
+ if ok && nv.Offset.ToActualOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
@@ -422,7 +422,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
}
n := new(needle.Needle)
- err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version)
+ err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version)
if err != nil {
return nil
}