diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-09 11:37:07 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-09 11:37:07 -0800 |
| commit | 821c46edf10097200b986bd17dc01d3991cf57ff (patch) | |
| tree | ca181a9ef3c2f7e45cf0dbb40373b87717a9a636 /weed/storage | |
| parent | 15da5834e1a33d060924740ba195f6bcd79f2af2 (diff) | |
| parent | a6e8d606b47e5f3e8cd8a57d2769d6f1404fbc8f (diff) | |
| download | seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.tar.xz seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.zip | |
Merge branch 'master' into support_ssd_volume
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/backend/backend.go | 4 | ||||
| -rw-r--r-- | weed/storage/disk_location.go | 8 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_decoder.go | 2 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_test.go | 4 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 2 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume_test.go | 6 | ||||
| -rw-r--r-- | weed/storage/needle/needle_parse_upload.go | 4 | ||||
| -rw-r--r-- | weed/storage/needle_map.go | 4 | ||||
| -rw-r--r-- | weed/storage/store.go | 12 | ||||
| -rw-r--r-- | weed/storage/store_ec.go | 4 | ||||
| -rw-r--r-- | weed/storage/types/offset_4bytes.go | 2 | ||||
| -rw-r--r-- | weed/storage/types/offset_5bytes.go | 2 | ||||
| -rw-r--r-- | weed/storage/volume.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_backup.go | 8 | ||||
| -rw-r--r-- | weed/storage/volume_checking.go | 2 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_read_write.go | 18 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 10 |
18 files changed, 50 insertions, 50 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index daab29621..b8b883be6 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -1,6 +1,7 @@ package backend import ( + "github.com/chrislusf/seaweedfs/weed/util" "io" "os" "strings" @@ -9,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/spf13/viper" ) type BackendStorageFile interface { @@ -45,7 +45,7 @@ var ( ) // used by master to load remote storage configurations -func LoadConfiguration(config *viper.Viper) { +func LoadConfiguration(config *util.ViperProxy) { StorageBackendPrefix := "storage.backend" diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index ce42232a7..a7be3a559 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -84,7 +84,7 @@ func getValidVolumeName(basename string) string { return "" } -func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { +func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool { basename := fileInfo.Name() if fileInfo.IsDir() { return false @@ -135,7 +135,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne return true } -func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) { task_queue := make(chan os.FileInfo, 10*concurrency) go func() { @@ -169,7 +169,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { l.concurrentLoadingVolumes(needleMapKind, 10) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) @@ -239,7 +239,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e erro return } -func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { +func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { if fileInfo, found := l.LocateVolume(vid); found { return l.loadExistingVolume(fileInfo, needleMapKind) } diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index bc86d9c04..47d3c6550 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -58,7 +58,7 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, return nil } - entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version) + entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version) if datSize < entryStopOffset { datSize = entryStopOffset } diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index 63cc2c352..0d48bec02 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -93,7 +93,7 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) { data := make([]byte, size) - n, err := datFile.ReadAt(data, offset.ToAcutalOffset()) + n, err := datFile.ReadAt(data, offset.ToActualOffset()) if err != nil { return nil, fmt.Errorf("failed to ReadAt dat file: %v", err) } @@ -105,7 +105,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) { - intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size) + intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size) for i, interval := range intervals { if d, e := readOneInterval(interval, ecFiles); e != nil { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 2183e43d6..a9d08ed0e 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -211,7 +211,7 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle. shard := ev.Shards[0] // calculate the locations in the ec shards - intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, version))) + intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version))) return } diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go index fe45bf722..747ef4aab 100644 --- a/weed/storage/erasure_coding/ec_volume_test.go +++ b/weed/storage/erasure_coding/ec_volume_test.go @@ -35,16 +35,16 @@ func TestPositioning(t *testing.T) { needleId, _ := types.ParseNeedleId(test.needleId) offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") - fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size) + fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size) } needleId, _ := types.ParseNeedleId("0f087622") offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") - fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size) + fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size) var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3 - intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion))) + intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion))) for _, interval := range intervals { shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index 8f457be1d..7201503f1 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -193,9 +193,9 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error mtype = contentType } - pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" - // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" } + pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" + // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" return } diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 9f331267d..5b41286ea 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -11,10 +11,10 @@ import ( . "github.com/chrislusf/seaweedfs/weed/storage/types" ) -type NeedleMapType int +type NeedleMapKind int const ( - NeedleMapInMemory NeedleMapType = iota + NeedleMapInMemory NeedleMapKind = iota NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer diff --git a/weed/storage/store.go b/weed/storage/store.go index 470ce0c18..482e8998c 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -40,7 +40,7 @@ type Store struct { dataCenter string // optional informaton, overwriting master setting if exists rack string // optional information, overwriting master setting if exists connected bool - NeedleMapType NeedleMapType + NeedleMapKind NeedleMapKind NewVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage @@ -52,8 +52,8 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType, diskTypes []DiskType) (s *Store) { - s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { + s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i]) @@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } @@ -373,7 +373,7 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error { func (s *Store) MountVolume(i needle.VolumeId) error { for _, location := range s.Locations { - if found := location.LoadVolume(i, s.NeedleMapType); found == true { + if found := location.LoadVolume(i, s.NeedleMapKind); found == true { glog.V(0).Infof("mount volume %d", i) v := s.findVolume(i) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 853757ce3..ab4e96634 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -131,7 +131,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e return 0, ErrorDeleted } - glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals) + glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals) if len(intervals) > 1 { glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) @@ -144,7 +144,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e return 0, ErrorDeleted } - err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version) + err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("readbytes: %v", err) } diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go index d53147e21..5348d5b36 100644 --- a/weed/storage/types/offset_4bytes.go +++ b/weed/storage/types/offset_4bytes.go @@ -54,7 +54,7 @@ func ToOffset(offset int64) Offset { return Uint32ToOffset(smaller) } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize) } diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go index 05c6d2f39..b6181fc11 100644 --- a/weed/storage/types/offset_5bytes.go +++ b/weed/storage/types/offset_5bytes.go @@ -71,7 +71,7 @@ func ToOffset(offset int64) Offset { } } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize) } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 1905a85a5..afa0fbf28 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -25,7 +25,7 @@ type Volume struct { Collection string DataBackend backend.BackendStorageFile nm NeedleMapper - needleMapKind NeedleMapType + needleMapKind NeedleMapKind noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteLock sync.RWMutex @@ -50,7 +50,7 @@ type Volume struct { lastIoError error } -func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index 9aeb10f69..82ea12a89 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -154,13 +154,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()) if err != nil { - return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err) + return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err) } - _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength) if err != nil { - return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) + return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err) } return n.AppendAtNs, nil diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 00e04047f..8d63c39c1 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -58,7 +58,7 @@ func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) ( return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) } } else { - if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil { + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil { return lastAppendAtNs, err } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index a6efc630d..52a50a98c 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) { +func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = super_block.SuperBlock{} v.needleMapKind = needleMapKind @@ -22,7 +22,7 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) { alreadyHasSuperBlock := false hasLoadedVolume := false diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index f28ee50e6..07376bc88 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -41,9 +41,9 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && !nv.Offset.IsZero() && nv.Size.IsValid() { oldNeedle := new(needle.Needle) - err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) + err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version()) if err != nil { - glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err) + glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err) return false } if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { @@ -113,7 +113,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) if existingNeedleReadErr != nil { err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return @@ -136,7 +136,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan v.lastAppendAtNs = n.AppendAtNs // add to needle map - if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { + if !ok || uint64(nv.Offset.ToActualOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } @@ -179,7 +179,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) if existingNeedleReadErr != nil { err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return @@ -201,7 +201,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU v.lastAppendAtNs = n.AppendAtNs // add to needle map - if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { + if !ok || uint64(nv.Offset.ToActualOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } @@ -303,9 +303,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro if readSize == 0 { return 0, nil } - err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version()) + err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) if err == needle.ErrorSizeMismatch && OffsetSize == 4 { - err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) } v.checkReadWriteError(err) if err != nil { @@ -410,7 +410,7 @@ type VolumeFileScanner interface { } func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, - needleMapKind NeedleMapType, + needleMapKind NeedleMapKind, volumeFileScanner VolumeFileScanner) (err error) { var v *Volume if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 5884eca87..c17c9c937 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -280,11 +280,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI //updated needle if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() { //even the needle cache in memory is hit, the need_bytes is correct - glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) + glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size) var needleBytes []byte - needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) + needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version()) if err != nil { - return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) + return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } dst.Write(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) @@ -339,7 +339,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in } nv, ok := scanner.v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size.IsValid() { + if ok && nv.Offset.ToActualOffset() == offset && nv.Size > 0 && nv.Size.IsValid() { if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } @@ -422,7 +422,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str } n := new(needle.Needle) - err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version) + err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) if err != nil { return nil } |
