aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
authorustuzhanin <55892859+ustuzhanin@users.noreply.github.com>2020-10-02 22:47:25 +0500
committerGitHub <noreply@github.com>2020-10-02 22:47:25 +0500
commit3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch)
treee0b42e531d18136d9e272258187a305690ee2b4d /weed/storage
parentcbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff)
parent9ab98fa912814686b3035a97b5173c1628fbc0fc (diff)
downloadseaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz
seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/backend/volume_create.go2
-rw-r--r--weed/storage/disk_location_ec.go4
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go6
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go2
-rw-r--r--weed/storage/erasure_coding/ec_locate.go10
-rw-r--r--weed/storage/erasure_coding/ec_shard.go8
-rw-r--r--weed/storage/erasure_coding/ec_test.go8
-rw-r--r--weed/storage/erasure_coding/ec_volume.go8
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume_test.go2
-rw-r--r--weed/storage/idx/walk.go12
-rw-r--r--weed/storage/needle/file_id.go2
-rw-r--r--weed/storage/needle/needle.go5
-rw-r--r--weed/storage/needle/needle_parse_upload.go11
-rw-r--r--weed/storage/needle/needle_read_write.go34
-rw-r--r--weed/storage/needle/volume_ttl_test.go5
-rw-r--r--weed/storage/needle_map.go4
-rw-r--r--weed/storage/needle_map/compact_map.go27
-rw-r--r--weed/storage/needle_map/compact_map_perf_test.go3
-rw-r--r--weed/storage/needle_map/compact_map_test.go20
-rw-r--r--weed/storage/needle_map/memdb.go13
-rw-r--r--weed/storage/needle_map/needle_value.go6
-rw-r--r--weed/storage/needle_map/needle_value_map.go4
-rw-r--r--weed/storage/needle_map_leveldb.go23
-rw-r--r--weed/storage/needle_map_memory.go8
-rw-r--r--weed/storage/needle_map_metric.go18
-rw-r--r--weed/storage/needle_map_metric_test.go2
-rw-r--r--weed/storage/needle_map_sorted_file.go4
-rw-r--r--weed/storage/store.go27
-rw-r--r--weed/storage/store_ec.go5
-rw-r--r--weed/storage/store_vacuum.go5
-rw-r--r--weed/storage/types/needle_types.go23
-rw-r--r--weed/storage/volume.go5
-rw-r--r--weed/storage/volume_backup.go2
-rw-r--r--weed/storage/volume_checking.go33
-rw-r--r--weed/storage/volume_read_write.go63
-rw-r--r--weed/storage/volume_super_block.go2
-rw-r--r--weed/storage/volume_vacuum.go8
-rw-r--r--weed/storage/volume_vacuum_test.go6
39 files changed, 265 insertions, 167 deletions
diff --git a/weed/storage/backend/volume_create.go b/weed/storage/backend/volume_create.go
index abb1f7238..d4bd8e40f 100644
--- a/weed/storage/backend/volume_create.go
+++ b/weed/storage/backend/volume_create.go
@@ -14,7 +14,7 @@ func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32
return nil, e
}
if preallocate > 0 {
- glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
+ glog.V(2).Infof("Preallocated disk space for %s is not supported", fileName)
}
return NewDiskFile(file), nil
}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 72d3e2b3e..07fab96d9 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -3,6 +3,7 @@ package storage
import (
"fmt"
"io/ioutil"
+ "os"
"path"
"regexp"
"sort"
@@ -58,6 +59,9 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId)
if err != nil {
+ if err == os.ErrNotExist {
+ return os.ErrNotExist
+ }
return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
}
l.ecVolumesLock.Lock()
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index 99bcb6ca5..795a7d523 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -52,9 +52,9 @@ func FindDatFileSize(baseFileName string) (datSize int64, err error) {
return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
}
- err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return nil
}
@@ -88,7 +88,7 @@ func readEcVolumeVersion(baseFileName string) (version needle.Version, err error
}
-func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 5f0f20284..34b639407 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -294,7 +294,7 @@ func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
defer indexFile.Close()
cm := needle_map.NewMemDb()
- err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size)
} else {
diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go
index 562966f8f..19eba6235 100644
--- a/weed/storage/erasure_coding/ec_locate.go
+++ b/weed/storage/erasure_coding/ec_locate.go
@@ -1,14 +1,18 @@
package erasure_coding
+import (
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
type Interval struct {
BlockIndex int
InnerBlockOffset int64
- Size uint32
+ Size types.Size
IsLargeBlock bool
LargeBlockRowsCount int
}
-func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
+func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size types.Size) (intervals []Interval) {
blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
// adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
@@ -32,7 +36,7 @@ func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
intervals = append(intervals, interval)
return
}
- interval.Size = uint32(blockRemaining)
+ interval.Size = types.Size(blockRemaining)
intervals = append(intervals, interval)
size -= interval.Size
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index 47e6d3d1e..74ed99198 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -5,6 +5,7 @@ import (
"os"
"path"
"strconv"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -29,11 +30,14 @@ func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, sha
// open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
+ if e == os.ErrNotExist || strings.Contains(e.Error(), "no such file or directory") {
+ return nil, os.ErrNotExist
+ }
+ return nil, fmt.Errorf("cannot read ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), e)
}
ecdFi, statErr := v.ecdFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr)
+ return nil, fmt.Errorf("can not stat ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), statErr)
}
v.ecdFileSize = ecdFi.Size()
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 92b83cdc8..63cc2c352 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -71,7 +71,7 @@ func validateFiles(baseFileName string) error {
return nil
}
-func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) error {
+func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) error {
data, err := readDatFile(datFile, offset, size)
if err != nil {
@@ -90,7 +90,7 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type
return nil
}
-func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, error) {
+func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) {
data := make([]byte, size)
n, err := datFile.ReadAt(data, offset.ToAcutalOffset())
@@ -103,7 +103,7 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er
return data, nil
}
-func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) {
+func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
@@ -140,7 +140,7 @@ func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err er
return
}
-func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size uint32) (data []byte, err error) {
+func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size types.Size) (data []byte, err error) {
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
if err != nil {
return nil, fmt.Errorf("failed to create encoder: %v", err)
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index eef53765f..785f33ec4 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -187,7 +187,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
return
}
-func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
+func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
// find the needle from ecx file
offset, size, err = ev.FindNeedleFromEcx(needleId)
@@ -198,16 +198,16 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
shard := ev.Shards[0]
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, version)))
return
}
-func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 822a9e923..a7f8c24a3 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -12,7 +12,7 @@ import (
var (
MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
- util.Uint32toBytes(b, types.TombstoneFileSize)
+ types.SizeToBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
return fmt.Errorf("sorted needle write error: %v", err)
diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go
index 9a3b1e644..fe45bf722 100644
--- a/weed/storage/erasure_coding/ec_volume_test.go
+++ b/weed/storage/erasure_coding/ec_volume_test.go
@@ -44,7 +44,7 @@ func TestPositioning(t *testing.T) {
fmt.Printf("offset: %d size: %d\n", offset.ToAcutalOffset(), size)
var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
- intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, needle.CurrentVersion)))
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToAcutalOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
for _, interval := range intervals {
shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
diff --git a/weed/storage/idx/walk.go b/weed/storage/idx/walk.go
index 44140d142..5215d3c4f 100644
--- a/weed/storage/idx/walk.go
+++ b/weed/storage/idx/walk.go
@@ -5,21 +5,23 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
// walks through the index file, calls fn function with each key, offset, size
// stops with the error returned by the fn function
-func WalkIndexFile(r io.ReaderAt, fn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+func WalkIndexFile(r io.ReaderAt, fn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
var readerOffset int64
bytes := make([]byte, types.NeedleMapEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset)
+ if count == 0 && e == io.EOF {
+ return nil
+ }
glog.V(3).Infof("readerOffset %d count %d err: %v", readerOffset, count, e)
readerOffset += int64(count)
var (
key types.NeedleId
offset types.Offset
- size uint32
+ size types.Size
i int
)
@@ -40,10 +42,10 @@ func WalkIndexFile(r io.ReaderAt, fn func(key types.NeedleId, offset types.Offse
return e
}
-func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size uint32) {
+func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size types.Size) {
key = types.BytesToNeedleId(bytes[:types.NeedleIdSize])
offset = types.BytesToOffset(bytes[types.NeedleIdSize : types.NeedleIdSize+types.OffsetSize])
- size = util.BytesToUint32(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize])
+ size = types.BytesToSize(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize])
return
}
diff --git a/weed/storage/needle/file_id.go b/weed/storage/needle/file_id.go
index 5dabb0f25..6055bdd1c 100644
--- a/weed/storage/needle/file_id.go
+++ b/weed/storage/needle/file_id.go
@@ -66,7 +66,7 @@ func formatNeedleIdCookie(key NeedleId, cookie Cookie) string {
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
CookieToBytes(bytes[NeedleIdSize:NeedleIdSize+CookieSize], cookie)
nonzero_index := 0
- for ; bytes[nonzero_index] == 0; nonzero_index++ {
+ for ; bytes[nonzero_index] == 0 && nonzero_index < NeedleIdSize; nonzero_index++ {
}
return hex.EncodeToString(bytes[nonzero_index:])
}
diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go
index 150d6ee4b..34d29ab6e 100644
--- a/weed/storage/needle/needle.go
+++ b/weed/storage/needle/needle.go
@@ -24,7 +24,7 @@ const (
type Needle struct {
Cookie Cookie `comment:"random number to mitigate brute force lookups"`
Id NeedleId `comment:"needle id"`
- Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
+ Size Size `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
DataSize uint32 `comment:"Data size"` //version2
Data []byte `comment:"The actual file data"`
@@ -48,7 +48,7 @@ func (n *Needle) String() (str string) {
return
}
-func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
+func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
n = new(Needle)
pu, e := ParseUpload(r, sizeLimit)
if e != nil {
@@ -58,6 +58,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit
originalSize = pu.OriginalDataSize
n.LastModified = pu.ModifiedTime
n.Ttl = pu.Ttl
+ contentMd5 = pu.ContentMd5
if len(pu.FileName) < 256 {
n.Name = []byte(pu.FileName)
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
index dd678f87f..4d244046e 100644
--- a/weed/storage/needle/needle_parse_upload.go
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -29,6 +29,7 @@ type ParsedUpload struct {
Ttl *TTL
IsChunkedFile bool
UncompressedData []byte
+ ContentMd5 string
}
func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
@@ -83,11 +84,13 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
}
}
+ // md5
+ h := md5.New()
+ h.Write(pu.UncompressedData)
+ pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil))
if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" {
- h := md5.New()
- h.Write(pu.UncompressedData)
- if receivedChecksum := base64.StdEncoding.EncodeToString(h.Sum(nil)); expectedChecksum != receivedChecksum {
- e = fmt.Errorf("Content-MD5 did not match md5 of file data [%s] != [%s]", expectedChecksum, receivedChecksum)
+ if expectedChecksum != pu.ContentMd5 {
+ e = fmt.Errorf("Content-MD5 did not match md5 of file data expected [%s] received [%s] size %d", expectedChecksum, pu.ContentMd5, len(pu.UncompressedData))
return
}
}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 575a72e40..89fc85b0d 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -28,7 +28,7 @@ func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
-func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) {
+func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error) {
writeBytes := make([]byte, 0)
@@ -37,8 +37,8 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
header := make([]byte, NeedleHeaderSize)
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
- n.Size = uint32(len(n.Data))
- util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+ n.Size = Size(len(n.Data))
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
size := n.Size
actualSize := NeedleHeaderSize + int64(n.Size)
writeBytes = append(writeBytes, header...)
@@ -58,12 +58,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
}
n.DataSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Mime))
if n.DataSize > 0 {
- n.Size = 4 + n.DataSize + 1
+ n.Size = 4 + Size(n.DataSize) + 1
if n.HasName() {
- n.Size = n.Size + 1 + uint32(n.NameSize)
+ n.Size = n.Size + 1 + Size(n.NameSize)
}
if n.HasMime() {
- n.Size = n.Size + 1 + uint32(n.MimeSize)
+ n.Size = n.Size + 1 + Size(n.MimeSize)
}
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
@@ -72,12 +72,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
n.Size = n.Size + TtlBytesLength
}
if n.HasPairs() {
- n.Size += 2 + uint32(n.PairsSize)
+ n.Size += 2 + Size(n.PairsSize)
}
} else {
n.Size = 0
}
- util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
if n.DataSize > 0 {
util.Uint32toBytes(header[0:4], n.DataSize)
@@ -119,13 +119,13 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
}
- return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil
+ return writeBytes, Size(n.DataSize), GetActualSize(n.Size, version), nil
}
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size uint32, actualSize int64, err error) {
+func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.BackendStorageFile, off int64) {
@@ -154,7 +154,7 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return offset, size, actualSize, err
}
-func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
+func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
@@ -165,7 +165,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, ver
}
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
-func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) {
n.ParseNeedleHeader(bytes)
if n.Size != size {
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
@@ -195,7 +195,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers
}
// ReadData hydrates the needle from the file, with only n.Id is set.
-func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
bytes, err := ReadNeedleBlob(r, offset, size, version)
if err != nil {
return err
@@ -206,7 +206,7 @@ func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint3
func (n *Needle) ParseNeedleHeader(bytes []byte) {
n.Cookie = BytesToCookie(bytes[0:CookieSize])
n.Id = BytesToNeedleId(bytes[CookieSize : CookieSize+NeedleIdSize])
- n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize])
+ n.Size = BytesToSize(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize])
}
func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
@@ -288,7 +288,7 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6
return
}
-func PaddingLength(needleSize uint32, version Version) uint32 {
+func PaddingLength(needleSize Size, version Version) Size {
if version == Version3 {
// this is same value as version2, but just listed here for clarity
return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize + TimestampSize) % NeedlePaddingSize)
@@ -296,7 +296,7 @@ func PaddingLength(needleSize uint32, version Version) uint32 {
return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize) % NeedlePaddingSize)
}
-func NeedleBodyLength(needleSize uint32, version Version) int64 {
+func NeedleBodyLength(needleSize Size, version Version) int64 {
if version == Version3 {
return int64(needleSize) + NeedleChecksumSize + TimestampSize + int64(PaddingLength(needleSize, version))
}
@@ -390,6 +390,6 @@ func (n *Needle) SetHasPairs() {
n.Flags = n.Flags | FlagHasPairs
}
-func GetActualSize(size uint32, version Version) int64 {
+func GetActualSize(size Size, version Version) int64 {
return NeedleHeaderSize + NeedleBodyLength(size, version)
}
diff --git a/weed/storage/needle/volume_ttl_test.go b/weed/storage/needle/volume_ttl_test.go
index 0afebebf5..f75453593 100644
--- a/weed/storage/needle/volume_ttl_test.go
+++ b/weed/storage/needle/volume_ttl_test.go
@@ -30,6 +30,11 @@ func TestTTLReadWrite(t *testing.T) {
t.Errorf("5d ttl:%v", ttl)
}
+ ttl, _ = ReadTTL("50d")
+ if ttl.Minutes() != 50*24*60 {
+ t.Errorf("50d ttl:%v", ttl)
+ }
+
ttl, _ = ReadTTL("5w")
if ttl.Minutes() != 5*7*24*60 {
t.Errorf("5w ttl:%v", ttl)
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
index 8962e78cb..e91856dfe 100644
--- a/weed/storage/needle_map.go
+++ b/weed/storage/needle_map.go
@@ -19,7 +19,7 @@ const (
)
type NeedleMapper interface {
- Put(key NeedleId, offset Offset, size uint32) error
+ Put(key NeedleId, offset Offset, size Size) error
Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
Delete(key NeedleId, offset Offset) error
Close()
@@ -48,7 +48,7 @@ func (nm *baseNeedleMapper) IndexFileSize() uint64 {
return 0
}
-func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error {
+func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size Size) error {
bytes := needle_map.ToBytes(key, offset, size)
nm.indexFileAccessLock.Lock()
diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go
index 76783d0b0..2b1a471bc 100644
--- a/weed/storage/needle_map/compact_map.go
+++ b/weed/storage/needle_map/compact_map.go
@@ -18,7 +18,7 @@ const SectionalNeedleIdLimit = 1<<32 - 1
type SectionalNeedleValue struct {
Key SectionalNeedleId
OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 `comment:"Size of the data portion"`
+ Size Size `comment:"Size of the data portion"`
}
type SectionalNeedleValueExtra struct {
@@ -50,7 +50,7 @@ func NewCompactSection(start NeedleId) *CompactSection {
}
//return old entry size
-func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
+func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
cs.Lock()
if key > cs.end {
cs.end = key
@@ -80,7 +80,7 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs
return
}
-func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size uint32) {
+func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) {
needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size}
needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: offset.OffsetHigher}
insertCandidate := sort.Search(len(cs.overflow), func(i int) bool {
@@ -115,24 +115,21 @@ func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
return cs.overflow[i].Key >= key
})
if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
- for i := deleteCandidate; i < length-1; i++ {
- cs.overflow[i] = cs.overflow[i+1]
- cs.overflowExtra[i] = cs.overflowExtra[i+1]
+ if cs.overflow[deleteCandidate].Size.IsValid() {
+ cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size
}
- cs.overflow = cs.overflow[0 : length-1]
- cs.overflowExtra = cs.overflowExtra[0 : length-1]
}
}
//return old entry size
-func (cs *CompactSection) Delete(key NeedleId) uint32 {
+func (cs *CompactSection) Delete(key NeedleId) Size {
skey := SectionalNeedleId(key - cs.start)
cs.Lock()
- ret := uint32(0)
+ ret := Size(0)
if i := cs.binarySearchValues(skey); i >= 0 {
- if cs.values[i].Size > 0 && cs.values[i].Size != TombstoneFileSize {
+ if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
ret = cs.values[i].Size
- cs.values[i].Size = TombstoneFileSize
+ cs.values[i].Size = -cs.values[i].Size
}
}
if _, v, found := cs.findOverflowEntry(skey); found {
@@ -181,7 +178,7 @@ func NewCompactMap() *CompactMap {
return &CompactMap{}
}
-func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
+func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
x := cm.binarySearchCompactSection(key)
if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
// println(x, "adding to existing", len(cm.list), "sections, starting", key)
@@ -204,10 +201,10 @@ func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset O
// println(key, "set to section[", x, "].start", cm.list[x].start)
return cm.list[x].Set(key, offset, size)
}
-func (cm *CompactMap) Delete(key NeedleId) uint32 {
+func (cm *CompactMap) Delete(key NeedleId) Size {
x := cm.binarySearchCompactSection(key)
if x < 0 {
- return uint32(0)
+ return Size(0)
}
return cm.list[x].Delete(key)
}
diff --git a/weed/storage/needle_map/compact_map_perf_test.go b/weed/storage/needle_map/compact_map_perf_test.go
index cce1f9490..081fb34e9 100644
--- a/weed/storage/needle_map/compact_map_perf_test.go
+++ b/weed/storage/needle_map/compact_map_perf_test.go
@@ -9,7 +9,6 @@ import (
"time"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
/*
@@ -60,7 +59,7 @@ func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) {
rowCount++
key := BytesToNeedleId(bytes[i : i+NeedleIdSize])
offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize])
- size := util.BytesToUint32(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize])
+ size := BytesToSize(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize])
if !offset.IsZero() {
m.Set(NeedleId(key), offset, size)
diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go
index 7eea3969a..199cb26b3 100644
--- a/weed/storage/needle_map/compact_map_test.go
+++ b/weed/storage/needle_map/compact_map_test.go
@@ -49,7 +49,7 @@ func TestIssue52(t *testing.T) {
func TestCompactMap(t *testing.T) {
m := NewCompactMap()
for i := uint32(0); i < 100*batch; i += 2 {
- m.Set(NeedleId(i), ToOffset(int64(i)), i)
+ m.Set(NeedleId(i), ToOffset(int64(i)), Size(i))
}
for i := uint32(0); i < 100*batch; i += 37 {
@@ -57,7 +57,7 @@ func TestCompactMap(t *testing.T) {
}
for i := uint32(0); i < 10*batch; i += 3 {
- m.Set(NeedleId(i), ToOffset(int64(i+11)), i+5)
+ m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5))
}
// for i := uint32(0); i < 100; i++ {
@@ -72,15 +72,15 @@ func TestCompactMap(t *testing.T) {
if !ok {
t.Fatal("key", i, "missing!")
}
- if v.Size != i+5 {
+ if v.Size != Size(i+5) {
t.Fatal("key", i, "size", v.Size)
}
} else if i%37 == 0 {
- if ok && v.Size != TombstoneFileSize {
+ if ok && v.Size.IsValid() {
t.Fatal("key", i, "should have been deleted needle value", v)
}
} else if i%2 == 0 {
- if v.Size != i {
+ if v.Size != Size(i) {
t.Fatal("key", i, "size", v.Size)
}
}
@@ -89,14 +89,14 @@ func TestCompactMap(t *testing.T) {
for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(NeedleId(i))
if i%37 == 0 {
- if ok && v.Size != TombstoneFileSize {
+ if ok && v.Size.IsValid() {
t.Fatal("key", i, "should have been deleted needle value", v)
}
} else if i%2 == 0 {
if v == nil {
t.Fatal("key", i, "missing")
}
- if v.Size != i {
+ if v.Size != Size(i) {
t.Fatal("key", i, "size", v.Size)
}
}
@@ -129,8 +129,8 @@ func TestOverflow(t *testing.T) {
cs.deleteOverflowEntry(4)
- if len(cs.overflow) != 4 {
- t.Fatalf("expecting 4 entries now: %+v", cs.overflow)
+ if len(cs.overflow) != 5 {
+ t.Fatalf("expecting 5 entries now: %+v", cs.overflow)
}
_, x, _ := cs.findOverflowEntry(5)
@@ -146,7 +146,7 @@ func TestOverflow(t *testing.T) {
cs.deleteOverflowEntry(1)
for i, x := range cs.overflow {
- println("overflow[", i, "]:", x.Key)
+ println("overflow[", i, "]:", x.Key, "size", x.Size)
}
println()
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index a52d52a10..b25b5e89a 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -11,7 +11,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
//This map uses in memory level db
@@ -32,7 +31,7 @@ func NewMemDb() *MemDb {
return t
}
-func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error {
+func (cm *MemDb) Set(key NeedleId, offset Offset, size Size) error {
bytes := ToBytes(key, offset, size)
@@ -56,7 +55,7 @@ func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) {
return nil, false
}
offset := BytesToOffset(data[0:OffsetSize])
- size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
return &NeedleValue{Key: key, Offset: offset, Size: size}, true
}
@@ -67,7 +66,7 @@ func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
key := BytesToNeedleId(iter.Key())
data := iter.Value()
offset := BytesToOffset(data[0:OffsetSize])
- size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
needle := NeedleValue{Key: key, Offset: offset, Size: size}
ret = visit(needle)
@@ -89,7 +88,7 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
defer idxFile.Close()
return cm.AscendingVisit(func(value NeedleValue) error {
- if value.Offset.IsZero() || value.Size == TombstoneFileSize {
+ if value.Offset.IsZero() || value.Size.IsDeleted() {
return nil
}
_, err := idxFile.Write(value.ToBytes())
@@ -105,8 +104,8 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
}
defer idxFile.Close()
- return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
- if offset.IsZero() || size == TombstoneFileSize {
+ return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error {
+ if offset.IsZero() || size.IsDeleted() {
return cm.Delete(key)
}
return cm.Set(key, offset, size)
diff --git a/weed/storage/needle_map/needle_value.go b/weed/storage/needle_map/needle_value.go
index ef540b55e..f8d614660 100644
--- a/weed/storage/needle_map/needle_value.go
+++ b/weed/storage/needle_map/needle_value.go
@@ -9,7 +9,7 @@ import (
type NeedleValue struct {
Key NeedleId
Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 `comment:"Size of the data portion"`
+ Size Size `comment:"Size of the data portion"`
}
func (this NeedleValue) Less(than btree.Item) bool {
@@ -21,10 +21,10 @@ func (nv NeedleValue) ToBytes() []byte {
return ToBytes(nv.Key, nv.Offset, nv.Size)
}
-func ToBytes(key NeedleId, offset Offset, size uint32) []byte {
+func ToBytes(key NeedleId, offset Offset, size Size) []byte {
bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset)
- util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size)
+ util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], uint32(size))
return bytes
}
diff --git a/weed/storage/needle_map/needle_value_map.go b/weed/storage/needle_map/needle_value_map.go
index 0a5a00ef7..a30cb96c4 100644
--- a/weed/storage/needle_map/needle_value_map.go
+++ b/weed/storage/needle_map/needle_value_map.go
@@ -5,8 +5,8 @@ import (
)
type NeedleValueMap interface {
- Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32)
- Delete(key NeedleId) uint32
+ Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size)
+ Delete(key NeedleId) Size
Get(key NeedleId) (*NeedleValue, bool)
AscendingVisit(visit func(NeedleValue) error) error
}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 83589c231..415cd14dd 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
type LevelDbNeedleMap struct {
@@ -74,8 +73,8 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err
}
defer db.Close()
- return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error {
- if !offset.IsZero() && size != TombstoneFileSize {
+ return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
+ if !offset.IsZero() && size.IsValid() {
levelDbWrite(db, key, offset, size)
} else {
levelDbDelete(db, key)
@@ -92,12 +91,12 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
return nil, false
}
offset := BytesToOffset(data[0:OffsetSize])
- size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
}
-func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
- var oldSize uint32
+func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
+ var oldSize Size
if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size
}
@@ -109,7 +108,7 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
return levelDbWrite(m.db, key, offset, size)
}
-func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size uint32) error {
+func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size) error {
bytes := needle_map.ToBytes(key, offset, size)
@@ -125,14 +124,18 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
- if oldNeedle, ok := m.Get(key); ok {
- m.logDelete(oldNeedle.Size)
+ oldNeedle, found := m.Get(key)
+ if !found || oldNeedle.Size.IsDeleted() {
+ return nil
}
+ m.logDelete(oldNeedle.Size)
+
// write to index file first
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
- return levelDbDelete(m.db, key)
+
+ return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size)
}
func (m *LevelDbNeedleMap) Close() {
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index 84197912f..d0891dc98 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -28,13 +28,13 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
}
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
- e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
+ e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
nm.MaybeSetMaxFileKey(key)
- if !offset.IsZero() && size != TombstoneFileSize {
+ if !offset.IsZero() && size.IsValid() {
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
- if !oldOffset.IsZero() && oldSize != TombstoneFileSize {
+ if !oldOffset.IsZero() && oldSize.IsValid() {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
@@ -49,7 +49,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
return nm, e
}
-func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
+func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error {
_, oldSize := nm.m.Set(NeedleId(key), offset, size)
nm.logPut(key, oldSize, size)
return nm.appendToIndexFile(key, offset, size)
diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go
index 823a04108..3618dada9 100644
--- a/weed/storage/needle_map_metric.go
+++ b/weed/storage/needle_map_metric.go
@@ -18,31 +18,31 @@ type mapMetric struct {
MaximumFileKey uint64 `json:"MaxFileKey"`
}
-func (mm *mapMetric) logDelete(deletedByteCount uint32) {
+func (mm *mapMetric) logDelete(deletedByteCount Size) {
if mm == nil {
return
}
mm.LogDeletionCounter(deletedByteCount)
}
-func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
+func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) {
if mm == nil {
return
}
mm.MaybeSetMaxFileKey(key)
mm.LogFileCounter(newSize)
- if oldSize > 0 && oldSize != TombstoneFileSize {
+ if oldSize > 0 && oldSize.IsValid() {
mm.LogDeletionCounter(oldSize)
}
}
-func (mm *mapMetric) LogFileCounter(newSize uint32) {
+func (mm *mapMetric) LogFileCounter(newSize Size) {
if mm == nil {
return
}
atomic.AddUint32(&mm.FileCounter, 1)
atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
}
-func (mm *mapMetric) LogDeletionCounter(oldSize uint32) {
+func (mm *mapMetric) LogDeletionCounter(oldSize Size) {
if mm == nil {
return
}
@@ -97,11 +97,11 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
buf := make([]byte, NeedleIdSize)
err = reverseWalkIndexFile(r, func(entryCount int64) {
bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
- }, func(key NeedleId, offset Offset, size uint32) error {
+ }, func(key NeedleId, offset Offset, size Size) error {
mm.MaybeSetMaxFileKey(key)
NeedleIdToBytes(buf, key)
- if size != TombstoneFileSize {
+ if size.IsValid() {
mm.FileByteCounter += uint64(size)
}
@@ -111,7 +111,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
} else {
// deleted file
mm.DeletionCounter++
- if size != TombstoneFileSize {
+ if size.IsValid() {
// previously already deleted file
mm.DeletionByteCounter += uint64(size)
}
@@ -121,7 +121,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
return
}
-func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error {
+func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size Size) error) error {
fi, err := r.Stat()
if err != nil {
return fmt.Errorf("file %s stat error: %v", r.Name(), err)
diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go
index ae2177a30..362659a11 100644
--- a/weed/storage/needle_map_metric_test.go
+++ b/weed/storage/needle_map_metric_test.go
@@ -15,7 +15,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) {
nm := NewCompactNeedleMap(idxFile)
for i := 0; i < 10000; i++ {
- nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1))
+ nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1))
if rand.Float32() < 0.2 {
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0)))
}
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index e6f9258f3..1ca113ca9 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -65,7 +65,7 @@ func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue
}
-func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
+func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
return os.ErrInvalid
}
@@ -80,7 +80,7 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
return err
}
- if size == TombstoneFileSize {
+ if size.IsDeleted() {
return nil
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 02372da97..d5d59235a 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -23,6 +23,10 @@ const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
+type ReadOption struct {
+ ReadDeleted bool
+}
+
/*
* A VolumeServer contains one Store
*/
@@ -273,7 +277,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync boo
return
}
-func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
+func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, error) {
if v := s.findVolume(i); v != nil {
if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
@@ -283,9 +287,9 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32,
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
}
-func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
+func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) {
if v := s.findVolume(i); v != nil {
- return v.readNeedle(n)
+ return v.readNeedle(n, readOption)
}
return 0, fmt.Errorf("volume %d not found", i)
}
@@ -303,7 +307,20 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
+ v.noWriteLock.Lock()
v.noWriteOrDelete = true
+ v.noWriteLock.Unlock()
+ return nil
+}
+
+func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
+ v := s.findVolume(i)
+ if v == nil {
+ return fmt.Errorf("volume %d not found", i)
+ }
+ v.noWriteLock.Lock()
+ v.noWriteOrDelete = false
+ v.noWriteLock.Unlock()
return nil
}
@@ -363,10 +380,12 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
Ttl: v.Ttl.ToUint32(),
}
for _, location := range s.Locations {
- if found, error := location.deleteVolumeById(i); found && error == nil {
+ if err := location.DeleteVolume(i); err == nil {
glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message
return nil
+ } else {
+ glog.Errorf("DeleteVolume %d: %v", i, err)
}
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 2b0df439c..bd7bdacbd 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "os"
"sort"
"sync"
"time"
@@ -59,6 +60,8 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
}
return nil
+ } else if err == os.ErrNotExist {
+ continue
} else {
return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
}
@@ -124,7 +127,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
if err != nil {
return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return 0, fmt.Errorf("entry %s is deleted", n.Id)
}
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
index 38159496e..32666a417 100644
--- a/weed/storage/store_vacuum.go
+++ b/weed/storage/store_vacuum.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -16,6 +17,10 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
}
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
if v := s.findVolume(vid); v != nil {
+ s := stats.NewDiskStatus(v.dir)
+ if int64(s.Free) < preallocate {
+ return fmt.Errorf("free space: %d bytes, not enough for %d bytes", s.Free, preallocate)
+ }
return v.Compact2(preallocate, compactionBytePerSecond)
}
return fmt.Errorf("volume id %d is not found during compact", vid)
diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go
index 2ebb392db..137b97d7f 100644
--- a/weed/storage/types/needle_types.go
+++ b/weed/storage/types/needle_types.go
@@ -2,9 +2,9 @@ package types
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
- "math"
"strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Offset struct {
@@ -12,6 +12,15 @@ type Offset struct {
OffsetLower
}
+type Size int32
+
+func (s Size) IsDeleted() bool {
+ return s < 0 || s == TombstoneFileSize
+}
+func (s Size) IsValid() bool {
+ return s > 0 && s != TombstoneFileSize
+}
+
type OffsetLower struct {
b3 byte
b2 byte
@@ -27,7 +36,7 @@ const (
NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
- TombstoneFileSize = math.MaxUint32
+ TombstoneFileSize = Size(-1)
CookieSize = 4
)
@@ -49,3 +58,11 @@ func ParseCookie(cookieString string) (Cookie, error) {
}
return Cookie(cookie), nil
}
+
+func BytesToSize(bytes []byte) Size {
+ return Size(util.BytesToUint32(bytes))
+}
+
+func SizeToBytes(bytes []byte, size Size) {
+ util.Uint32toBytes(bytes, uint32(size))
+}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 73fdb417d..2d46fbcdf 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -27,6 +27,7 @@ type Volume struct {
needleMapKind NeedleMapType
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ noWriteLock sync.RWMutex
hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32
@@ -58,6 +59,8 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK
}
func (v *Volume) String() string {
+ v.noWriteLock.RLock()
+ defer v.noWriteLock.RUnlock()
return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
@@ -245,5 +248,7 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
}
func (v *Volume) IsReadOnly() bool {
+ v.noWriteLock.RLock()
+ defer v.noWriteLock.RUnlock()
return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
}
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index f7075fe2b..595bd8a35 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -253,7 +253,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
}
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
- if n.Size > 0 && n.Size != TombstoneFileSize {
+ if n.Size > 0 && n.Size.IsValid() {
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
}
return scanner.v.nm.Delete(n.Id, ToOffset(offset))
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index c33f0049a..e42fb238b 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -27,11 +27,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
if offset.IsZero() {
return 0, nil
}
- if size == TombstoneFileSize {
- size = 0
- }
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if size < 0 {
+ // read the deletion entry
+ if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ }
+ } else {
+ if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ }
}
return
}
@@ -55,7 +59,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
-func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
+func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
n := new(needle.Needle)
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
@@ -65,3 +69,20 @@ func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version,
}
return n.AppendAtNs, err
}
+
+func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) {
+ n := new(needle.Needle)
+ size := n.DiskSize(v)
+ var fileSize int64
+ fileSize, _, err = datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("GetStat: %v", err)
+ }
+ if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil {
+ return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err)
+ }
+ if n.Id != key {
+ return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
+ }
+ return n.AppendAtNs, err
+}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index edb5f48d8..e11bde2cb 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -25,7 +25,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
}
nv, ok := v.nm.Get(n.Id)
- if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize {
+ if ok && !nv.Offset.IsZero() && nv.Size.IsValid() {
oldNeedle := new(needle.Needle)
err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
if err != nil {
@@ -68,9 +68,9 @@ func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) {
v.asyncRequestsChan <- request
}
-func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
+func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) {
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
- actualSize := needle.GetActualSize(uint32(len(n.Data)), v.Version())
+ actualSize := needle.GetActualSize(Size(len(n.Data)), v.Version())
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@@ -80,7 +80,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size uint32, isUnch
return
}
if v.isFileUnchanged(n) {
- size = n.DataSize
+ size = Size(n.DataSize)
isUnchanged = true
return
}
@@ -120,7 +120,7 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size uint32, isUnch
return
}
-func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size uint32, isUnchanged bool, err error) {
+func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL {
n.SetHasTtl()
@@ -132,7 +132,7 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size
} else {
asyncRequest := needle.NewAsyncRequest(n, true)
// using len(n.Data) here instead of n.Size before n.Size is populated in n.Append()
- asyncRequest.ActualSize = needle.GetActualSize(uint32(len(n.Data)), v.Version())
+ asyncRequest.ActualSize = needle.GetActualSize(Size(len(n.Data)), v.Version())
v.asyncRequestAppend(asyncRequest)
offset, _, isUnchanged, err = asyncRequest.WaitComplete()
@@ -141,10 +141,10 @@ func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size
}
}
-func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
+func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) {
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if v.isFileUnchanged(n) {
- size = n.DataSize
+ size = Size(n.DataSize)
isUnchanged = true
return
}
@@ -183,7 +183,7 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size uint32, i
return
}
-func (v *Volume) syncDelete(n *needle.Needle) (uint32, error) {
+func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
actualSize := needle.GetActualSize(0, v.Version())
v.dataFileAccessLock.Lock()
@@ -195,8 +195,8 @@ func (v *Volume) syncDelete(n *needle.Needle) (uint32, error) {
}
nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok && nv.Size != TombstoneFileSize {
+ // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
@@ -213,7 +213,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (uint32, error) {
return 0, nil
}
-func (v *Volume) deleteNeedle2(n *needle.Needle) (uint32, error) {
+func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
// todo: delete info is always appended no fsync, it may need fsync in future
fsync := false
@@ -226,15 +226,15 @@ func (v *Volume) deleteNeedle2(n *needle.Needle) (uint32, error) {
v.asyncRequestAppend(asyncRequest)
_, size, _, err := asyncRequest.WaitComplete()
- return uint32(size), err
+ return Size(size), err
}
}
-func (v *Volume) doDeleteRequest(n *needle.Needle) (uint32, error) {
+func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok && nv.Size != TombstoneFileSize {
+ // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
@@ -252,7 +252,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (uint32, error) {
}
// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
@@ -260,13 +260,19 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
if !ok || nv.Offset.IsZero() {
return -1, ErrorNotFound
}
- if nv.Size == TombstoneFileSize {
- return -1, errors.New("already deleted")
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ glog.V(3).Infof("reading deleted %s", n.String())
+ readSize = -readSize
+ } else {
+ return -1, errors.New("already deleted")
+ }
}
- if nv.Size == 0 {
+ if readSize == 0 {
return 0, nil
}
- err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
+ err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
if err != nil {
return 0, err
}
@@ -299,7 +305,7 @@ func (v *Volume) startWorker() {
currentBytesToWrite := int64(0)
for {
request, ok := <-v.asyncRequestsChan
- //volume may be closed
+ // volume may be closed
if !ok {
chanClosed = true
break
@@ -375,10 +381,8 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("failed to load volume %d: %v", id, err)
}
- if v.volumeInfo.Version == 0 {
- if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("failed to process volume %d super block: %v", id, err)
- }
+ if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("failed to process volume %d super block: %v", id, err)
}
defer v.Close()
@@ -400,10 +404,11 @@ func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorag
for n != nil {
var needleBody []byte
if volumeFileScanner.ReadNeedleBody() {
+ // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
- glog.V(0).Infof("cannot read needle body: %v", err)
- //err = fmt.Errorf("cannot read needle body: %v", err)
- //return
+ glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
+ // err = fmt.Errorf("cannot read needle body: %v", err)
+ // return
}
}
err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index 5e913e062..20223ac1b 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -26,8 +26,10 @@ func (v *Volume) maybeWriteSuperBlock() error {
if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
+ v.noWriteLock.Lock()
v.noWriteOrDelete = false
v.noWriteCanDelete = false
+ v.noWriteLock.Unlock()
}
}
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index ed8172909..a3e5800df 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -207,7 +207,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
type keyField struct {
offset Offset
- size uint32
+ size Size
}
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
@@ -274,7 +274,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
//updated needle
- if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size != TombstoneFileSize {
+ if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() {
//even the needle cache in memory is hit, the need_bytes is correct
glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
var needleBytes []byte
@@ -335,7 +335,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
}
nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize {
+ if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
@@ -413,7 +413,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
offset, size := value.Offset, value.Size
- if offset.IsZero() || size == TombstoneFileSize {
+ if offset.IsZero() || size.IsDeleted() {
return nil
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 1b5161e63..f96e9b0cf 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -113,11 +113,11 @@ func TestCompaction(t *testing.T) {
}
n := newEmptyNeedle(uint64(i))
- size, err := v.readNeedle(n)
+ size, err := v.readNeedle(n, nil)
if err != nil {
t.Fatalf("read file %d: %v", i, err)
}
- if infos[i-1].size != uint32(size) {
+ if infos[i-1].size != types.Size(size) {
t.Fatalf("read file %d size mismatch expected %d found %d", i, infos[i-1].size, size)
}
if infos[i-1].crc != n.Checksum {
@@ -151,7 +151,7 @@ func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
}
type needleInfo struct {
- size uint32
+ size types.Size
crc needle.CRC
}