aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/disk_location.go65
-rw-r--r--weed/storage/disk_location_ec.go14
2 files changed, 62 insertions, 17 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 0536f085b..538f7469f 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -71,7 +71,6 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
-
}
}
}
@@ -116,28 +115,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.volumesLock.Lock()
- for k, v := range l.volumes {
- if v.Collection == collection {
- e = l.deleteVolumeById(k)
- if e != nil {
- l.volumesLock.Unlock()
- return
- }
- }
- }
+ delVolsMap := l.unmountVolumeByCollection(collection)
l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
- for k, v := range l.ecVolumes {
- if v.Collection == collection {
- e = l.deleteEcVolumeById(k)
- if e != nil {
- l.ecVolumesLock.Unlock()
- return
+ delEcVolsMap := l.unmountEcVolumeByCollection(collection)
+ l.ecVolumesLock.Unlock()
+
+ errChain := make(chan error, 2)
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ for _, v := range delVolsMap {
+ if err := v.Destroy(); err != nil {
+ errChain <- err
}
}
+ wg.Done()
+ }()
+
+ go func() {
+ for _, v := range delEcVolsMap {
+ v.Destroy()
+ }
+ wg.Done()
+ }()
+
+ go func() {
+ wg.Wait()
+ close(errChain)
+ }()
+
+ errBuilder := strings.Builder{}
+ for err := range errChain {
+ errBuilder.WriteString(err.Error())
+ errBuilder.WriteString("; ")
}
- l.ecVolumesLock.Unlock()
+ if errBuilder.Len() > 0 {
+ e = fmt.Errorf(errBuilder.String())
+ }
+
return
}
@@ -193,6 +210,20 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
return nil
}
+func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
+ deltaVols := make(map[needle.VolumeId]*Volume, 0)
+ for k, v := range l.volumes {
+ if v.Collection == collectionName && !v.isCompacting {
+ deltaVols[k] = v
+ }
+ }
+
+ for k, _ := range deltaVols {
+ delete(l.volumes, k)
+ }
+ return deltaVols
+}
+
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index ba0824c6d..f6c44e966 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
delete(l.ecVolumes, vid)
return
}
+
+func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume {
+ deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0)
+ for k, v := range l.ecVolumes {
+ if v.Collection == collectionName {
+ deltaVols[k] = v
+ }
+ }
+
+ for k, _ := range deltaVols {
+ delete(l.ecVolumes, k)
+ }
+ return deltaVols
+}