aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/storage/volume.go30
1 files changed, 26 insertions, 4 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 6ef72cfcd..1988c9aac 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -123,16 +123,21 @@ func (v *Volume) Size() int64 {
glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e)
return -1
}
+
+// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.accessLock.Lock()
defer v.accessLock.Unlock()
v.nm.Close()
_ = v.dataFile.Close()
}
+
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
+// isFileUnchanged checks whether this needle to write is same as last one.
+// It requires serialized access in the same volume.
func (v *Volume) isFileUnchanged(n *Needle) bool {
nv, ok := v.nm.Get(n.Id)
if ok && nv.Offset > 0 {
@@ -146,6 +151,7 @@ func (v *Volume) isFileUnchanged(n *Needle) bool {
return false
}
+// Destroy removes everything related to this volume
func (v *Volume) Destroy() (err error) {
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
@@ -223,7 +229,7 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
if _, err := v.dataFile.Seek(0, 2); err != nil {
return size, err
}
- n.Data = make([]byte, 0)
+ n.Data = nil
_, err := n.Append(v.dataFile, v.Version())
return size, err
}
@@ -304,20 +310,36 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
for n != nil {
if readNeedleBody {
if err = n.ReadNeedleBody(v.dataFile, version, offset+int64(NeedleHeaderSize), rest); err != nil {
- err = fmt.Errorf("cannot read needle body: %v", err)
- return
+ glog.V(0).Infof("cannot read needle body: %v", err)
+ //err = fmt.Errorf("cannot read needle body: %v", err)
+ //return
+ }
+ if n.DataSize >= n.Size {
+ // this should come from a bug reported on #87 and #93
+ // fixed in v0.69
+ // remove this whole "if" clause later, long after 0.69
+ oldRest, oldSize := rest, n.Size
+ padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
+ n.Size = 0
+ rest = n.Size + NeedleChecksumSize + padding
+ if rest%NeedlePaddingSize != 0 {
+ rest += (NeedlePaddingSize - rest%NeedlePaddingSize)
+ }
+ glog.V(4).Infof("Adjusting n.Size %d=>0 rest:%d=>%d %+v", oldSize, oldRest, rest, n)
}
}
if err = visitNeedle(n, offset); err != nil {
- return
+ glog.V(0).Infof("visit needle error: %v", err)
}
offset += int64(NeedleHeaderSize) + int64(rest)
+ glog.V(4).Infof("==> new entry offset %d", offset)
if n, rest, err = ReadNeedleHeader(v.dataFile, version, offset); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("cannot read needle header: %v", err)
}
+ glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
}
return