aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_read_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_read_write.go')
-rw-r--r--weed/storage/volume_read_write.go30
1 files changed, 23 insertions, 7 deletions
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 7458b4879..66f18557f 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -60,6 +60,8 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
+ } else if offset != int64(v.dataFileSize) {
+ glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset)
}
//ensure file writing starting from aligned positions
if offset%NeedlePaddingSize != 0 {
@@ -67,9 +69,12 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
if offset, err = v.dataFile.Seek(offset, 0); err != nil {
glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
return
+ } else if offset != int64(v.dataFileSize) {
+ glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset)
}
}
- v.dataFile.Write(b)
+ _, err = v.dataFile.Write(b)
+ v.dataFileSize += int64(len(b))
return
}
@@ -86,10 +91,12 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
glog.V(4).Infof("needle is unchanged!")
return
}
- var offset int64
+ var offset, actualSize int64
if offset, err = v.dataFile.Seek(0, 2); err != nil {
glog.V(0).Infof("failed to seek the end of file: %v", err)
return
+ } else if offset != int64(v.dataFileSize) {
+ glog.V(0).Infof("dataFileSize %d != actual data file size: %d", v.dataFileSize, offset)
}
//ensure file writing starting from aligned positions
@@ -101,12 +108,14 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
}
}
- if size, err = n.Append(v.dataFile, v.Version()); err != nil {
+ if size, actualSize, err = n.Append(v.dataFile, v.Version()); err != nil {
if e := v.dataFile.Truncate(offset); e != nil {
err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
}
return
}
+ v.dataFileSize += actualSize
+
nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
@@ -128,16 +137,20 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
defer v.dataFileAccessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok {
+ if ok && nv.Size != TombstoneFileSize {
size := nv.Size
- if err := v.nm.Delete(n.Id); err != nil {
+ // println("adding tombstone", n.Id, "at offset", v.dataFileSize)
+ if err := v.nm.Delete(n.Id, uint32(v.dataFileSize/NeedlePaddingSize)); err != nil {
return size, err
}
- if _, err := v.dataFile.Seek(0, 2); err != nil {
+ if offset, err := v.dataFile.Seek(0, 2); err != nil {
return size, err
+ } else if offset != int64(v.dataFileSize) {
+ glog.V(0).Infof("dataFileSize %d != actual data file size: %d, deleteMarker: %d", v.dataFileSize, offset, getActualSize(0))
}
n.Data = nil
- _, err := n.Append(v.dataFile, v.Version())
+ _, actualSize, err := n.Append(v.dataFile, v.Version())
+ v.dataFileSize += actualSize
return size, err
}
return 0, nil
@@ -149,6 +162,9 @@ func (v *Volume) readNeedle(n *Needle) (int, error) {
if !ok || nv.Offset == 0 {
return -1, errors.New("Not Found")
}
+ if nv.Size == TombstoneFileSize {
+ return -1, errors.New("Already Deleted")
+ }
err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
if err != nil {
return 0, err