aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-18 19:22:16 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-18 19:22:16 -0700
commitfe01191b5b93c7f3851e7add9b046c89c8189cdc (patch)
tree37a167eac12e540c4a4e26460061fd0b8407bb9b
parent6ccd7f0a4d8ff5167054147cc66774fec84d80b6 (diff)
downloadseaweedfs-fe01191b5b93c7f3851e7add9b046c89c8189cdc.tar.xz
seaweedfs-fe01191b5b93c7f3851e7add9b046c89c8189cdc.zip
support read option readDeleted=true
-rw-r--r--weed/command/export.go2
-rw-r--r--weed/command/fix.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go2
-rw-r--r--weed/storage/needle_map/compact_map.go9
-rw-r--r--weed/storage/needle_map_leveldb.go10
-rw-r--r--weed/storage/types/needle_types.go9
-rw-r--r--weed/storage/volume_read_write.go23
7 files changed, 31 insertions, 26 deletions
diff --git a/weed/command/export.go b/weed/command/export.go
index 0e2e7ccd9..3ea4b00d3 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -111,7 +111,7 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
nv, ok := needleMap.Get(n.Id)
glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v",
n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv)
- if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset {
+ if ok && nv.Size.IsValid() && nv.Offset.ToAcutalOffset() == offset {
if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) {
glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d",
n.LastModified, newerThanUnix)
diff --git a/weed/command/fix.go b/weed/command/fix.go
index e1455790f..ae9a051b8 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -48,7 +48,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
- if n.Size > 0 && n.Size != types.TombstoneFileSize {
+ if n.Size.IsValid() {
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 822a9e923..a7f8c24a3 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -12,7 +12,7 @@ import (
var (
MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
- util.Uint32toBytes(b, types.TombstoneFileSize)
+ types.SizeToBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
return fmt.Errorf("sorted needle write error: %v", err)
diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go
index c1fb00268..81ff27c45 100644
--- a/weed/storage/needle_map/compact_map.go
+++ b/weed/storage/needle_map/compact_map.go
@@ -115,12 +115,9 @@ func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
return cs.overflow[i].Key >= key
})
if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
- for i := deleteCandidate; i < length-1; i++ {
- cs.overflow[i] = cs.overflow[i+1]
- cs.overflowExtra[i] = cs.overflowExtra[i+1]
+ if cs.overflow[deleteCandidate].Size.IsValid() {
+ cs.overflow[deleteCandidate].Size = - cs.overflow[deleteCandidate].Size
}
- cs.overflow = cs.overflow[0 : length-1]
- cs.overflowExtra = cs.overflowExtra[0 : length-1]
}
}
@@ -132,7 +129,7 @@ func (cs *CompactSection) Delete(key NeedleId) Size {
if i := cs.binarySearchValues(skey); i >= 0 {
if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
ret = cs.values[i].Size
- cs.values[i].Size = TombstoneFileSize
+ cs.values[i].Size = -cs.values[i].Size
}
}
if _, v, found := cs.findOverflowEntry(skey); found {
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 4a17e6d0e..415cd14dd 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -124,14 +124,18 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
- if oldNeedle, ok := m.Get(key); ok {
- m.logDelete(oldNeedle.Size)
+ oldNeedle, found := m.Get(key)
+ if !found || oldNeedle.Size.IsDeleted() {
+ return nil
}
+ m.logDelete(oldNeedle.Size)
+
// write to index file first
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
- return levelDbDelete(m.db, key)
+
+ return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size)
}
func (m *LevelDbNeedleMap) Close() {
diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go
index 0e9115c0d..7e30d2bd8 100644
--- a/weed/storage/types/needle_types.go
+++ b/weed/storage/types/needle_types.go
@@ -2,7 +2,6 @@ package types
import (
"fmt"
- "math"
"strconv"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -13,13 +12,13 @@ type Offset struct {
OffsetLower
}
-type Size uint32
+type Size int32
func (s Size) IsDeleted() bool {
- return s == TombstoneFileSize
+ return s < 0 || s == TombstoneFileSize
}
func (s Size) IsValid() bool {
- return s != TombstoneFileSize
+ return s >0 && s != TombstoneFileSize
}
type OffsetLower struct {
@@ -37,7 +36,7 @@ const (
NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
- TombstoneFileSize = math.MaxUint32
+ TombstoneFileSize = Size(-1)
CookieSize = 4
)
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index b2487dde0..4a997832c 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -195,7 +195,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
}
nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
@@ -233,7 +233,7 @@ func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size.IsValid() {
size := nv.Size
n.Data = nil
@@ -260,13 +260,18 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if !ok || nv.Offset.IsZero() {
return -1, ErrorNotFound
}
- if nv.Size.IsDeleted() {
- return -1, errors.New("already deleted")
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ readSize = -readSize
+ } else {
+ return -1, errors.New("already deleted")
+ }
}
- if nv.Size == 0 {
+ if readSize == 0 {
return 0, nil
}
- err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
+ err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
if err != nil {
return 0, err
}
@@ -299,7 +304,7 @@ func (v *Volume) startWorker() {
currentBytesToWrite := int64(0)
for {
request, ok := <-v.asyncRequestsChan
- //volume may be closed
+ // volume may be closed
if !ok {
chanClosed = true
break
@@ -402,8 +407,8 @@ func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorag
if volumeFileScanner.ReadNeedleBody() {
if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
- //err = fmt.Errorf("cannot read needle body: %v", err)
- //return
+ // err = fmt.Errorf("cannot read needle body: %v", err)
+ // return
}
}
err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)