aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-10 14:42:50 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-10 14:42:52 -0700
commitc9ab8d05fa9b425352ce978b5c5b5b0d71d787ad (patch)
tree54eb1b881c6345a66fadca7ce9db1cfd0e800fad /weed
parent07ec6c89a01e55d1745ddc75395ecd0dfe4cce02 (diff)
downloadseaweedfs-c9ab8d05fa9b425352ce978b5c5b5b0d71d787ad.tar.xz
seaweedfs-c9ab8d05fa9b425352ce978b5c5b5b0d71d787ad.zip
fixes for reading deleted fid
Diffstat (limited to 'weed')
-rw-r--r--weed/command/fix.go12
-rw-r--r--weed/storage/needle_map.go2
-rw-r--r--weed/storage/needle_map/memdb.go7
-rw-r--r--weed/storage/needle_map_leveldb.go6
-rw-r--r--weed/storage/needle_map_memory.go19
-rw-r--r--weed/storage/needle_map_sorted_file.go6
-rw-r--r--weed/storage/volume_backup.go2
-rw-r--r--weed/storage/volume_read_write.go10
-rw-r--r--weed/storage/volume_vacuum.go4
9 files changed, 43 insertions, 25 deletions
diff --git a/weed/command/fix.go b/weed/command/fix.go
index ae9a051b8..a3435804b 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -30,6 +30,7 @@ var (
fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files")
fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+ fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", false, "include deleted entries in the index file")
)
type VolumeFileScanner4Fix struct {
@@ -50,9 +51,14 @@ func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
if n.Size.IsValid() {
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
- glog.V(2).Infof("saved %d with error %v", n.Size, pe)
+ glog.V(2).Infof("saved %s %d bytes with error %v", n.Id.String(), n.Size, pe)
} else {
- glog.V(2).Infof("skipping deleted file ...")
+ if val, found := scanner.nm.Get(n.Id); *fixIncludeDeleted && found && val.Size > 0 {
+ pe := scanner.nm.Set(n.Id, val.Offset, -val.Size)
+ glog.V(2).Infof("update deleted %s %d bytes with error %v", n.Id.String(), -val.Size, pe)
+ return nil
+ }
+ glog.V(1).Infof("skipping deleted file %s size %d ...", n.Id.String(), n.Size)
return scanner.nm.Delete(n.Id)
}
return nil
@@ -83,7 +89,7 @@ func runFix(cmd *Command, args []string) bool {
os.Remove(indexFileName)
}
- if err := nm.SaveToIdx(indexFileName); err != nil {
+ if err := nm.SaveToIdx(indexFileName, *fixIncludeDeleted); err != nil {
glog.Fatalf("save to .idx File: %v", err)
os.Remove(indexFileName)
}
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
index e91856dfe..1662a322e 100644
--- a/weed/storage/needle_map.go
+++ b/weed/storage/needle_map.go
@@ -21,7 +21,7 @@ const (
type NeedleMapper interface {
Put(key NeedleId, offset Offset, size Size) error
Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
- Delete(key NeedleId, offset Offset) error
+ Delete(key NeedleId) error
Close()
Destroy() error
ContentSize() uint64
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index b25b5e89a..eb9da7f18 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -80,7 +80,7 @@ func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
return
}
-func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
+func (cm *MemDb) SaveToIdx(idxName string, includeDeleted bool) (ret error) {
idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return
@@ -88,7 +88,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
defer idxFile.Close()
return cm.AscendingVisit(func(value NeedleValue) error {
- if value.Offset.IsZero() || value.Size.IsDeleted() {
+ if value.Offset.IsZero() {
+ return nil
+ }
+ if !includeDeleted && value.Size.IsDeleted() {
return nil
}
_, err := idxFile.Write(value.ToBytes())
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 415cd14dd..c8820bdb7 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -74,7 +74,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
}
defer db.Close()
return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
- if !offset.IsZero() && size.IsValid() {
+ if !offset.IsZero() {
levelDbWrite(db, key, offset, size)
} else {
levelDbDelete(db, key)
@@ -123,7 +123,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
return db.Delete(bytes, nil)
}
-func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
+func (m *LevelDbNeedleMap) Delete(key NeedleId) error {
oldNeedle, found := m.Get(key)
if !found || oldNeedle.Size.IsDeleted() {
return nil
@@ -131,7 +131,7 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
m.logDelete(oldNeedle.Size)
// write to index file first
- if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
+ if err := m.appendToIndexFile(key, oldNeedle.Offset, -oldNeedle.Size); err != nil {
return err
}
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index d0891dc98..3fa85deda 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -30,13 +30,18 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
nm.MaybeSetMaxFileKey(key)
- if !offset.IsZero() && size.IsValid() {
+ if !offset.IsZero() {
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
+
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
if !oldOffset.IsZero() && oldSize.IsValid() {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
+ } else if size < 0 {
+ // deletion
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(-size)
}
} else {
oldSize := nm.m.Delete(NeedleId(key))
@@ -54,14 +59,18 @@ func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error {
nm.logPut(key, oldSize, size)
return nm.appendToIndexFile(key, offset, size)
}
-func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
- element, ok = nm.m.Get(NeedleId(key))
+func (nm *NeedleMap) Get(key NeedleId) (existingValue *needle_map.NeedleValue, ok bool) {
+ existingValue, ok = nm.m.Get(NeedleId(key))
return
}
-func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error {
+func (nm *NeedleMap) Delete(key NeedleId) error {
+ existingValue, ok := nm.m.Get(NeedleId(key))
+ if !ok || existingValue.Size.IsDeleted(){
+ return nil
+ }
deletedBytes := nm.m.Delete(NeedleId(key))
nm.logDelete(deletedBytes)
- return nm.appendToIndexFile(key, offset, TombstoneFileSize)
+ return nm.appendToIndexFile(key, existingValue.Offset, -existingValue.Size)
}
func (nm *NeedleMap) Close() {
indexFileName := nm.indexFile.Name()
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index 1ca113ca9..afb1e782f 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -69,9 +69,9 @@ func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error
return os.ErrInvalid
}
-func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
+func (m *SortedFileNeedleMap) Delete(key NeedleId) error {
- _, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
+ offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
if err != nil {
if err == erasure_coding.NotFoundError {
@@ -85,7 +85,7 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
}
// write to index file first
- if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
+ if err := m.appendToIndexFile(key, offset, -size); err != nil {
return err
}
_, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 595bd8a35..fdae1add4 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -256,5 +256,5 @@ func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset in
if n.Size > 0 && n.Size.IsValid() {
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
}
- return scanner.v.nm.Delete(n.Id, ToOffset(offset))
+ return scanner.v.nm.Delete(n.Id)
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index e77010dbd..08cbad57b 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -200,12 +200,12 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
size := nv.Size
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, _, _, err := n.Append(v.DataBackend, v.Version())
+ _, _, _, err := n.Append(v.DataBackend, v.Version())
if err != nil {
return size, err
}
v.lastAppendAtNs = n.AppendAtNs
- if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
+ if err = v.nm.Delete(n.Id); err != nil {
return size, err
}
return size, err
@@ -238,12 +238,12 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
size := nv.Size
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, _, _, err := n.Append(v.DataBackend, v.Version())
+ _, _, _, err := n.Append(v.DataBackend, v.Version())
if err != nil {
return size, err
}
v.lastAppendAtNs = n.AppendAtNs
- if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
+ if err = v.nm.Delete(n.Id); err != nil {
return size, err
}
return size, err
@@ -263,7 +263,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
readSize := nv.Size
if readSize.IsDeleted() {
if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
- glog.V(3).Infof("reading deleted %s", n.String())
+ glog.V(3).Infof("reading deleted %s size %d", n.String(), readSize)
readSize = -readSize
} else {
return -1, errors.New("already deleted")
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index a3e5800df..100067693 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -374,7 +374,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return nil
}
- err = nm.SaveToIdx(idxName)
+ err = nm.SaveToIdx(idxName, false)
return
}
@@ -441,7 +441,7 @@ func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName str
return nil
})
- newNm.SaveToIdx(datIdxName)
+ newNm.SaveToIdx(datIdxName, false)
return
}