aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-21 14:49:58 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-21 14:49:58 -0700
commitbafa95045b1d01a4594b96f00068c050baa796de (patch)
treebfc4118c2d9d757dc4e57c72c3ff38a14dcdf0ad
parent57df14f76febf5f94131be0b4bde833d45d8f37c (diff)
downloadseaweedfs-bafa95045b1d01a4594b96f00068c050baa796de.tar.xz
seaweedfs-bafa95045b1d01a4594b96f00068c050baa796de.zip
volume: deletion checks all disk locations
fix https://github.com/chrislusf/seaweedfs/issues/1283
-rw-r--r--weed/storage/disk_location.go6
-rw-r--r--weed/storage/store.go14
2 files changed, 14 insertions, 6 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 3c8a7b864..088763c45 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -167,7 +167,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
return
}
-func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
+func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
v, ok := l.volumes[vid]
if !ok {
return
@@ -176,6 +176,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
if e != nil {
return
}
+ found = true
delete(l.volumes, vid)
return
}
@@ -195,7 +196,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
if !ok {
return fmt.Errorf("Volume not found, VolumeId: %d", vid)
}
- return l.deleteVolumeById(vid)
+ _, err := l.deleteVolumeById(vid)
+ return err
}
func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 64e437add..0fff80aa9 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -221,8 +221,14 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
// delete expired volumes.
location.volumesLock.Lock()
for _, vid := range deleteVids {
- location.deleteVolumeById(vid)
- glog.V(0).Infoln("volume", vid, "is deleted.")
+ found, err := location.deleteVolumeById(vid)
+ if found {
+ if err == nil {
+ glog.V(0).Infof("volume %d is deleted", vid)
+ } else {
+ glog.V(0).Infof("delete volume %d: %v", vid, err)
+ }
+ }
}
location.volumesLock.Unlock()
}
@@ -355,7 +361,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
func (s *Store) DeleteVolume(i needle.VolumeId) error {
v := s.findVolume(i)
if v == nil {
- return nil
+ return fmt.Errorf("delete volume %d not found on disk", i)
}
message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
@@ -365,7 +371,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
Ttl: v.Ttl.ToUint32(),
}
for _, location := range s.Locations {
- if error := location.deleteVolumeById(i); error == nil {
+ if found, error := location.deleteVolumeById(i); found && error == nil {
glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message
return nil