aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-01-05 19:52:17 -0800
committerChris Lu <chris.lu@gmail.com>2019-01-05 19:52:17 -0800
commit9383c91eb1ce9f20cad8706a70cec16ab15c453b (patch)
treedd1ec56800a5323ea0f78693a42ee073e2b94630
parentfe50224ea0cdd665617a1e0c5dd434c874de0790 (diff)
downloadseaweedfs-9383c91eb1ce9f20cad8706a70cec16ab15c453b.tar.xz
seaweedfs-9383c91eb1ce9f20cad8706a70cec16ab15c453b.zip
wait to read again if the volume is compacting
-rw-r--r--weed/storage/volume.go1
-rw-r--r--weed/storage/volume_read_write.go8
-rw-r--r--weed/storage/volume_vacuum.go2
3 files changed, 10 insertions, 1 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 17917cea8..8ca6d3ffa 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -16,6 +16,7 @@ type Volume struct {
Collection string
dataFile *os.File
nm NeedleMapper
+ compactingWg sync.WaitGroup
needleMapKind NeedleMapType
readOnly bool
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index e90e26144..8f628c178 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -12,6 +12,8 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
+var ErrorNotFound = errors.New("not found")
+
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
func (v *Volume) isFileUnchanged(n *Needle) bool {
@@ -134,7 +136,11 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
func (v *Volume) readNeedle(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset == 0 {
- return -1, errors.New("Not Found")
+ v.compactingWg.Wait()
+ nv, ok = v.nm.Get(n.Id)
+ if !ok || nv.Offset == 0 {
+ return -1, ErrorNotFound
+ }
}
if nv.Size == TombstoneFileSize {
return -1, errors.New("Already Deleted")
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 5e0b19b66..642114d01 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -43,6 +43,8 @@ func (v *Volume) commitCompact() error {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
+ v.compactingWg.Add(1)
+ defer v.compactingWg.Done()
v.nm.Close()
if err := v.dataFile.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id)