diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-02-23 13:41:30 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-02-23 13:41:30 +0800 |
| commit | 620b91f23eaf5718088dc9ddcf91540967d0c8a6 (patch) | |
| tree | 04e92a8f92b548e26080040d009f23a51d9cc521 /weed/storage | |
| parent | 690d7c10b826b53bf823faef76603cd6ad83aa1d (diff) | |
| parent | 90cdf9dcace5595b31104df3a3b7e4038a7db341 (diff) | |
| download | seaweedfs-620b91f23eaf5718088dc9ddcf91540967d0c8a6.tar.xz seaweedfs-620b91f23eaf5718088dc9ddcf91540967d0c8a6.zip | |
Merge pull request #73 from chrislusf/master
sync
Diffstat (limited to 'weed/storage')
27 files changed, 226 insertions, 88 deletions
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 2b04c8df2..498963c31 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -1,6 +1,8 @@ package backend import ( + "github.com/chrislusf/seaweedfs/weed/glog" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "time" ) @@ -12,12 +14,25 @@ var ( type DiskFile struct { File *os.File fullFilePath string + fileSize int64 + modTime time.Time } func NewDiskFile(f *os.File) *DiskFile { + stat, err := f.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", f.Name(), err) + } + offset := stat.Size() + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + } + return &DiskFile{ fullFilePath: f.Name(), File: f, + fileSize: offset, + modTime: stat.ModTime(), } } @@ -26,11 +41,28 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) { } func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { - return df.File.WriteAt(p, off) + n, err = df.File.WriteAt(p, off) + if err == nil { + waterMark := off + int64(n) + if waterMark > df.fileSize { + df.fileSize = waterMark + df.modTime = time.Now() + } + } + return +} + +func (df *DiskFile) Append(p []byte) (n int, err error) { + return df.WriteAt(p, df.fileSize) } func (df *DiskFile) Truncate(off int64) error { - return df.File.Truncate(off) + err := df.File.Truncate(off) + if err == nil { + df.fileSize = off + df.modTime = time.Now() + } + return err } func (df *DiskFile) Close() error { @@ -38,11 +70,7 @@ func (df *DiskFile) Close() error { } func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { - stat, e := df.File.Stat() - if e == nil { - return stat.Size(), stat.ModTime(), nil - } - return 0, time.Time{}, err + return df.fileSize, df.modTime, nil } func (df *DiskFile) Name() string { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9b2ab69fe..6de87c793 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "os" "path/filepath" @@ -19,6 +20,7 @@ import ( type DiskLocation struct { Directory string IdxDirectory string + DiskType types.DiskType MaxVolumeCount int OriginalMaxVolumeCount int MinFreeSpacePercent float32 @@ -32,7 +34,7 @@ type DiskLocation struct { isDiskSpaceLow bool } -func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation { +func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation { dir = util.ResolvePath(dir) if idxDir == "" { idxDir = dir @@ -42,6 +44,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32 location := &DiskLocation{ Directory: dir, IdxDirectory: idxDir, + DiskType: diskType, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent, @@ -82,7 +85,7 @@ func getValidVolumeName(basename string) string { return "" } -func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { +func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool { basename := fileInfo.Name() if fileInfo.IsDir() { return false @@ -133,7 +136,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne return true } -func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) { task_queue := make(chan os.FileInfo, 10*concurrency) go func() { @@ -167,7 +170,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { l.concurrentLoadingVolumes(needleMapKind, 10) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) @@ -237,7 +240,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro return } -func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { +func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { if fileInfo, found := l.LocateVolume(vid); found { return l.loadExistingVolume(fileInfo, needleMapKind) } diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index d1237b40f..91c7d86a6 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -14,7 +14,7 @@ import ( ) var ( - re = regexp.MustCompile("\\.ec[0-9][0-9]") + re = regexp.MustCompile(`\.ec[0-9][0-9]`) ) func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) { @@ -57,7 +57,7 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) { - ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId) + ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId) if err != nil { if err == os.ErrNotExist { return os.ErrNotExist @@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { - ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid) + ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid) if err != nil { return fmt.Errorf("failed to create ec volume %d: %v", vid, err) } diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index bc86d9c04..47d3c6550 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -58,7 +58,7 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, return nil } - entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version) + entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version) if datSize < entryStopOffset { datSize = entryStopOffset } diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 74ed99198..2a57d85ef 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -2,6 +2,7 @@ package erasure_coding import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "path" "strconv" @@ -20,11 +21,12 @@ type EcVolumeShard struct { dir string ecdFile *os.File ecdFileSize int64 + DiskType types.DiskType } -func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { +func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { - v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId} + v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType} baseFileName := v.FileName() diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index 63cc2c352..0d48bec02 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -93,7 +93,7 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) { data := make([]byte, size) - n, err := datFile.ReadAt(data, offset.ToAcutalOffset()) + n, err := datFile.ReadAt(data, offset.ToActualOffset()) if err != nil { return nil, fmt.Errorf("failed to ReadAt dat file: %v", err) } @@ -105,7 +105,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) { - intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size) + intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size) for i, interval := range intervals { if d, e := readOneInterval(interval, ecFiles); e != nil { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 2183e43d6..85d6a5fc8 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -36,10 +36,11 @@ type EcVolume struct { Version needle.Version ecjFile *os.File ecjFileAccessLock sync.Mutex + diskType types.DiskType } -func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { - ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid} +func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { + ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType} dataBaseFileName := EcShardFileName(collection, dir, int(vid)) indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) @@ -191,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V m = &master_pb.VolumeEcShardInformationMessage{ Id: uint32(s.VolumeId), Collection: s.Collection, + DiskType: string(ev.diskType), } messages = append(messages, m) } @@ -211,7 +213,7 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle. shard := ev.Shards[0] // calculate the locations in the ec shards - intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, version))) + intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version))) return } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 8ff65bb0f..3dd535e64 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -10,13 +10,15 @@ type EcVolumeInfo struct { VolumeId needle.VolumeId Collection string ShardBits ShardBits + DiskType string } -func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo { +func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo { return &EcVolumeInfo{ Collection: collection, VolumeId: vid, ShardBits: shardBits, + DiskType: diskType, } } @@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { VolumeId: ecInfo.VolumeId, Collection: ecInfo.Collection, ShardBits: ecInfo.ShardBits.Minus(other.ShardBits), + DiskType: ecInfo.DiskType, } return ret @@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb. Id: uint32(ecInfo.VolumeId), EcIndexBits: uint32(ecInfo.ShardBits), Collection: ecInfo.Collection, + DiskType: ecInfo.DiskType, } } diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go index fe45bf722..747ef4aab 100644 --- a/weed/storage/erasure_coding/ec_volume_test.go +++ b/weed/storage/erasure_coding/ec_volume_test.go @@ -35,16 +35,16 @@ func TestPositioning(t *testing.T) { needleId, _ := types.ParseNeedleId(test.needleId) offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") - fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size) + fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size) } needleId, _ := types.ParseNeedleId("0f087622") offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") - fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size) + fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size) var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3 - intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion))) + intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion))) for _, interval := range intervals { shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index e758a6fee..0f72bc0bb 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -161,7 +161,15 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi dataSize := GetActualSize(size, version) dataSlice = make([]byte, int(dataSize)) - _, err = r.ReadAt(dataSlice, offset) + var n int + n, err = r.ReadAt(dataSlice, offset) + if err != nil && int64(n) == dataSize { + err = nil + } + if err != nil { + fileSize, _, _ := r.GetStat() + println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) + } return dataSlice, err } diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go index 47582dd26..afcea5a05 100644 --- a/weed/storage/needle/needle_read_write_test.go +++ b/weed/storage/needle/needle_read_write_test.go @@ -48,7 +48,7 @@ func TestAppend(t *testing.T) { int64 : -9223372036854775808 to 9223372036854775807 */ - fileSize := int64(4294967295) + 10000 + fileSize := int64(4294967296) + 10000 tempFile.Truncate(fileSize) defer func() { tempFile.Close() diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 9f331267d..d35391f66 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -1,7 +1,6 @@ package storage import ( - "fmt" "io" "os" "sync" @@ -11,10 +10,10 @@ import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" ) -type NeedleMapType int +type NeedleMapKind int const ( - NeedleMapInMemory NeedleMapType = iota + NeedleMapInMemory NeedleMapKind = iota NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer @@ -41,6 +40,7 @@ type baseNeedleMapper struct { indexFile *os.File indexFileAccessLock sync.Mutex + indexFileOffset int64 } func (nm *baseNeedleMapper) IndexFileSize() uint64 { @@ -56,11 +56,10 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size nm.indexFileAccessLock.Lock() defer nm.indexFileAccessLock.Unlock() - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot seek end of indexfile %s: %v", - nm.indexFile.Name(), err) + written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset) + if err == nil { + nm.indexFileOffset += int64(written) } - _, err := nm.indexFile.Write(bytes) return err } diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 415cd14dd..9716e9729 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -31,6 +31,11 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option generateLevelDbFile(dbFileName, indexFile) glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) } + if stat, err := indexFile.Stat(); err != nil { + glog.Fatalf("stat file %s: %v", indexFile.Name(), err) + } else { + m.indexFileOffset = stat.Size() + } glog.V(1).Infof("Opening %s...", dbFileName) if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil { diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index d0891dc98..1b58708c6 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -19,6 +19,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { m: needle_map.NewCompactMap(), } nm.indexFile = file + stat, err := file.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", file.Name(), err) + } + nm.indexFileOffset = stat.Size() return nm } diff --git a/weed/storage/store.go b/weed/storage/store.go index ff28be47c..47829666a 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -40,7 +40,7 @@ type Store struct { dataCenter string // optional informaton, overwriting master setting if exists rack string // optional information, overwriting master setting if exists connected bool - NeedleMapType NeedleMapType + NeedleMapKind NeedleMapKind NewVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage @@ -52,11 +52,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) { - s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { + s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) @@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume { } return nil } -func (s *Store) FindFreeLocation() (ret *DiskLocation) { +func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { max := 0 for _, location := range s.Locations { + if diskType != location.DiskType { + continue + } currentFreeCount := location.MaxVolumeCount - location.VolumesLen() currentFreeCount *= erasure_coding.DataShardsCount currentFreeCount -= location.EcVolumesLen() @@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.FindFreeLocation(); location != nil { + if location := s.FindFreeLocation(diskType); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { @@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind ReplicaPlacement: uint32(replicaPlacement.Byte()), Version: uint32(volume.Version()), Ttl: ttl.ToUint32(), + DiskType: string(diskType), } return nil } else { @@ -169,6 +173,7 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) { ReadOnly: v.IsReadOnly(), Ttl: v.Ttl, CompactRevision: uint32(v.CompactionRevision), + DiskType: v.DiskType().String(), } s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() @@ -202,13 +207,13 @@ func (s *Store) GetRack() string { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage - maxVolumeCount := 0 + maxVolumeCounts := make(map[string]uint32) var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId - maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount) location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() @@ -280,15 +285,15 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } return &master_pb.Heartbeat{ - Ip: s.Ip, - Port: uint32(s.Port), - PublicUrl: s.PublicUrl, - MaxVolumeCount: uint32(maxVolumeCount), - MaxFileKey: NeedleIdToUint64(maxFileKey), - DataCenter: s.dataCenter, - Rack: s.rack, - Volumes: volumeMessages, - HasNoVolumes: len(volumeMessages) == 0, + Ip: s.Ip, + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCounts: maxVolumeCounts, + MaxFileKey: NeedleIdToUint64(maxFileKey), + DataCenter: s.dataCenter, + Rack: s.rack, + Volumes: volumeMessages, + HasNoVolumes: len(volumeMessages) == 0, } } @@ -362,7 +367,7 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error { func (s *Store) MountVolume(i needle.VolumeId) error { for _, location := range s.Locations { - if found := location.LoadVolume(i, s.NeedleMapType); found == true { + if found := location.LoadVolume(i, s.NeedleMapKind); found == true { glog.V(0).Infof("mount volume %d", i) v := s.findVolume(i) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ @@ -371,6 +376,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } return nil } @@ -390,6 +396,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { @@ -414,6 +421,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { if err := location.DeleteVolume(i); err == nil { @@ -463,6 +471,9 @@ func (s *Store) GetVolumeSizeLimit() uint64 { func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { volumeSizeLimit := s.GetVolumeSizeLimit() + if volumeSizeLimit == 0 { + return + } for _, diskLocation := range s.Locations { if diskLocation.OriginalMaxVolumeCount == 0 { currentMaxVolumeCount := diskLocation.MaxVolumeCount diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 853757ce3..a9b6a8ff3 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -58,6 +58,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er Id: uint32(vid), Collection: collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), + DiskType: string(location.DiskType), } return nil } else if err == os.ErrNotExist { @@ -82,6 +83,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar Id: uint32(vid), Collection: ecShard.Collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), + DiskType: string(ecShard.DiskType), } for _, location := range s.Locations { @@ -131,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e return 0, ErrorDeleted } - glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals) + glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals) if len(intervals) > 1 { glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) @@ -144,7 +146,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e return 0, ErrorDeleted } - err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version) + err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("readbytes: %v", err) } diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index fcccbba7d..65ec53819 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -6,9 +6,9 @@ import ( ) type ReplicaPlacement struct { - SameRackCount int - DiffRackCount int - DiffDataCenterCount int + SameRackCount int `json:"node,omitempty"` + DiffRackCount int `json:"rack,omitempty"` + DiffDataCenterCount int `json:"dc,omitempty"` } func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go index d53147e21..5348d5b36 100644 --- a/weed/storage/types/offset_4bytes.go +++ b/weed/storage/types/offset_4bytes.go @@ -54,7 +54,7 @@ func ToOffset(offset int64) Offset { return Uint32ToOffset(smaller) } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize) } diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go index 05c6d2f39..b6181fc11 100644 --- a/weed/storage/types/offset_5bytes.go +++ b/weed/storage/types/offset_5bytes.go @@ -71,7 +71,7 @@ func ToOffset(offset int64) Offset { } } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize) } diff --git a/weed/storage/types/volume_disk_type.go b/weed/storage/types/volume_disk_type.go new file mode 100644 index 000000000..c9b87d802 --- /dev/null +++ b/weed/storage/types/volume_disk_type.go @@ -0,0 +1,40 @@ +package types + +import ( + "strings" +) + +type DiskType string + +const ( + HardDriveType DiskType = "" + SsdType = "ssd" +) + +func ToDiskType(vt string) (diskType DiskType) { + vt = strings.ToLower(vt) + diskType = HardDriveType + switch vt { + case "", "hdd": + diskType = HardDriveType + case "ssd": + diskType = SsdType + default: + diskType = DiskType(vt) + } + return +} + +func (diskType DiskType) String() string { + if diskType == "" { + return "" + } + return string(diskType) +} + +func (diskType DiskType) ReadableString() string { + if diskType == "" { + return "hdd" + } + return string(diskType) +} diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 80a74c3e3..366449c53 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -25,7 +25,7 @@ type Volume struct { Collection string DataBackend backend.BackendStorageFile nm NeedleMapper - needleMapKind NeedleMapType + needleMapKind NeedleMapKind noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteLock sync.RWMutex @@ -50,7 +50,7 @@ type Volume struct { lastIoError error } -func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} @@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 { return v.nm.IndexFileSize() } +func (v *Volume) DiskType() types.DiskType { + return v.location.DiskType +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), + DiskType: string(v.location.DiskType), } volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 9aeb10f69..82ea12a89 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -154,13 +154,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()) if err != nil { - return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err) + return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err) } - _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength) if err != nil { - return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) + return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err) } return n.AppendAtNs, nil diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 00e04047f..b76933083 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" "os" @@ -58,7 +59,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) ( return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) } } else { - if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil { + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil { return lastAppendAtNs, err } } @@ -148,3 +149,18 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V } return n.AppendAtNs, err } + +func (v *Volume) checkIdxFile() error { + datFileSize, _, err := v.DataBackend.GetStat() + if err != nil { + return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err) + } + if datFileSize <= super_block.SuperBlockSize { + return nil + } + indexFileName := v.FileName(".idx") + if util.FileExists(indexFileName) { + return nil + } + return fmt.Errorf("idx file %s does not exists", indexFileName) +} diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index 313818cde..9c64c9682 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -14,6 +14,7 @@ type VolumeInfo struct { Size uint64 ReplicaPlacement *super_block.ReplicaPlacement Ttl *needle.TTL + DiskType string Collection string Version needle.Version FileCount int @@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er ModifiedAtSecond: m.ModifiedAtSecond, RemoteStorageName: m.RemoteStorageName, RemoteStorageKey: m.RemoteStorageKey, + DiskType: m.DiskType, } rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { @@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu } vi.ReplicaPlacement = rp vi.Ttl = needle.LoadTTLFromUint32(m.Ttl) + vi.DiskType = m.DiskType return vi, nil } @@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe ModifiedAtSecond: vi.ModifiedAtSecond, RemoteStorageName: vi.RemoteStorageName, RemoteStorageKey: vi.RemoteStorageKey, + DiskType: vi.DiskType, } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index a6efc630d..bff1055bb 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) { +func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = super_block.SuperBlock{} v.needleMapKind = needleMapKind @@ -22,7 +22,7 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) { alreadyHasSuperBlock := false hasLoadedVolume := false @@ -96,6 +96,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind v.dirIdx = v.dir } } + // check volume idx files + if err := v.checkIdxFile(); err != nil { + glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err) + } var indexFile *os.File if v.noWriteOrDelete { glog.V(0).Infoln("open to read file", v.FileName(".idx")) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index f28ee50e6..07376bc88 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -41,9 +41,9 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && !nv.Offset.IsZero() && nv.Size.IsValid() { oldNeedle := new(needle.Needle) - err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) + err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version()) if err != nil { - glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err) + glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err) return false } if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { @@ -113,7 +113,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) if existingNeedleReadErr != nil { err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return @@ -136,7 +136,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan v.lastAppendAtNs = n.AppendAtNs // add to needle map - if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { + if !ok || uint64(nv.Offset.ToActualOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } @@ -179,7 +179,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) if existingNeedleReadErr != nil { err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return @@ -201,7 +201,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU v.lastAppendAtNs = n.AppendAtNs // add to needle map - if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { + if !ok || uint64(nv.Offset.ToActualOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } @@ -303,9 +303,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro if readSize == 0 { return 0, nil } - err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version()) + err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) if err == needle.ErrorSizeMismatch && OffsetSize == 4 { - err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) } v.checkReadWriteError(err) if err != nil { @@ -410,7 +410,7 @@ type VolumeFileScanner interface { } func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, - needleMapKind NeedleMapType, + needleMapKind NeedleMapKind, volumeFileScanner VolumeFileScanner) (err error) { var v *Volume if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 5884eca87..0ee1e61c6 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -280,13 +280,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI //updated needle if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() { //even the needle cache in memory is hit, the need_bytes is correct - glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) + glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size) var needleBytes []byte - needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) + needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version()) if err != nil { - return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) + return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } - dst.Write(needleBytes) + dstDatBackend.Append(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil @@ -339,7 +339,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in } nv, ok := scanner.v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size.IsValid() { + if ok && nv.Offset.ToActualOffset() == offset && nv.Size > 0 && nv.Size.IsValid() { if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } @@ -422,7 +422,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str } n := new(needle.Needle) - err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version) + err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) if err != nil { return nil } |
