aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/store.go7
-rw-r--r--weed/storage/volume.go6
-rw-r--r--weed/storage/volume_read_write.go23
3 files changed, 31 insertions, 5 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 7e5768417..0d4d6e916 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -221,7 +221,12 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
} else {
- glog.V(0).Infoln("volume", v.Id, "is expired.")
+ glog.V(0).Infoln("volume %d is expired", v.Id)
+ }
+ if v.lastIoError != nil {
+ deleteVids = append(deleteVids, v.Id)
+ } else {
+ glog.Warningf("volume %d has IO error", v.Id)
}
}
collectionVolumeSize[v.Collection] += volumeMessage.Size
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index a03846d3d..c726e7f11 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -46,6 +46,8 @@ type Volume struct {
volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation
+
+ lastIoError error
}
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
@@ -86,10 +88,10 @@ func (v *Volume) IndexFileName() (fileName string) {
func (v *Volume) FileName(ext string) (fileName string) {
switch ext {
case ".idx", ".cpx", ".ldb":
- return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))+ext
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext
}
// .dat, .cpd, .vif
- return VolumeFileName(v.dir, v.Collection, int(v.Id))+ext
+ return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext
}
func (v *Volume) Version() needle.Version {
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 6dc4cb4a5..c30abf237 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -19,6 +19,18 @@ var ErrorNotFound = errors.New("not found")
var ErrorDeleted = errors.New("already deleted")
var ErrorSizeMismatch = errors.New("size mismatch")
+func (v *Volume) checkReadWriteError(err error) {
+ if err == nil {
+ if v.lastIoError != nil {
+ v.lastIoError = nil
+ }
+ return
+ }
+ if err.Error() == "input/output error" {
+ v.lastIoError = err
+ }
+}
+
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
@@ -115,7 +127,9 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
// append to dat file
n.AppendAtNs = uint64(time.Now().UnixNano())
- if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
+ offset, size, _, err = n.Append(v.DataBackend, v.Version())
+ v.checkReadWriteError(err)
+ if err != nil {
return
}
@@ -179,7 +193,9 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
// append to dat file
n.AppendAtNs = uint64(time.Now().UnixNano())
- if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
+ offset, size, _, err = n.Append(v.DataBackend, v.Version())
+ v.checkReadWriteError(err)
+ if err != nil {
return
}
v.lastAppendAtNs = n.AppendAtNs
@@ -214,6 +230,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
offset, _, _, err := n.Append(v.DataBackend, v.Version())
+ v.checkReadWriteError(err)
if err != nil {
return size, err
}
@@ -252,6 +269,7 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
n.Data = nil
n.AppendAtNs = uint64(time.Now().UnixNano())
offset, _, _, err := n.Append(v.DataBackend, v.Version())
+ v.checkReadWriteError(err)
if err != nil {
return size, err
}
@@ -289,6 +307,7 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
}
+ v.checkReadWriteError(err)
if err != nil {
return 0, err
}