aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_ec_decode.go5
-rw-r--r--weed/storage/disk_location.go65
-rw-r--r--weed/storage/disk_location_ec.go14
-rw-r--r--weed/storage/volume_create.go5
-rw-r--r--weed/storage/volume_create_linux.go5
-rw-r--r--weed/storage/volume_create_windows.go11
6 files changed, 81 insertions, 24 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 4ec1a7e8f..b49ab2c58 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -149,8 +149,9 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB
maxShardCount := 0
var exisitngEcIndexBits erasure_coding.ShardBits
for loc, ecIndexBits := range nodeToEcIndexBits {
- if ecIndexBits.ShardIdCount() > maxShardCount {
- maxShardCount = ecIndexBits.ShardIdCount()
+ toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
+ if toBeCopiedShardCount > maxShardCount {
+ maxShardCount = toBeCopiedShardCount
targetNodeLocation = loc
exisitngEcIndexBits = ecIndexBits
}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 0536f085b..538f7469f 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -71,7 +71,6 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
} else {
glog.V(0).Infof("new volume %s error %s", name, e)
}
-
}
}
}
@@ -116,28 +115,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
l.volumesLock.Lock()
- for k, v := range l.volumes {
- if v.Collection == collection {
- e = l.deleteVolumeById(k)
- if e != nil {
- l.volumesLock.Unlock()
- return
- }
- }
- }
+ delVolsMap := l.unmountVolumeByCollection(collection)
l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
- for k, v := range l.ecVolumes {
- if v.Collection == collection {
- e = l.deleteEcVolumeById(k)
- if e != nil {
- l.ecVolumesLock.Unlock()
- return
+ delEcVolsMap := l.unmountEcVolumeByCollection(collection)
+ l.ecVolumesLock.Unlock()
+
+ errChain := make(chan error, 2)
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ for _, v := range delVolsMap {
+ if err := v.Destroy(); err != nil {
+ errChain <- err
}
}
+ wg.Done()
+ }()
+
+ go func() {
+ for _, v := range delEcVolsMap {
+ v.Destroy()
+ }
+ wg.Done()
+ }()
+
+ go func() {
+ wg.Wait()
+ close(errChain)
+ }()
+
+ errBuilder := strings.Builder{}
+ for err := range errChain {
+ errBuilder.WriteString(err.Error())
+ errBuilder.WriteString("; ")
}
- l.ecVolumesLock.Unlock()
+ if errBuilder.Len() > 0 {
+ e = fmt.Errorf(errBuilder.String())
+ }
+
return
}
@@ -193,6 +210,20 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
return nil
}
+func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
+ deltaVols := make(map[needle.VolumeId]*Volume, 0)
+ for k, v := range l.volumes {
+ if v.Collection == collectionName && !v.isCompacting {
+ deltaVols[k] = v
+ }
+ }
+
+ for k, _ := range deltaVols {
+ delete(l.volumes, k)
+ }
+ return deltaVols
+}
+
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
l.volumesLock.Lock()
defer l.volumesLock.Unlock()
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index ba0824c6d..f6c44e966 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
delete(l.ecVolumes, vid)
return
}
+
+func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume {
+ deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0)
+ for k, v := range l.ecVolumes {
+ if v.Collection == collectionName {
+ deltaVols[k] = v
+ }
+ }
+
+ for k, _ := range deltaVols {
+ delete(l.ecVolumes, k)
+ }
+ return deltaVols
+}
diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go
index ee41c50a9..ffcb246a4 100644
--- a/weed/storage/volume_create.go
+++ b/weed/storage/volume_create.go
@@ -11,8 +11,11 @@ import (
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+ if e != nil {
+ return nil, e
+ }
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
- return backend.NewDiskFile(file), e
+ return backend.NewDiskFile(file), nil
}
diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go
index 5fafbe924..ee599ac32 100644
--- a/weed/storage/volume_create_linux.go
+++ b/weed/storage/volume_create_linux.go
@@ -12,9 +12,12 @@ import (
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+ if e != nil {
+ return nil, e
+ }
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
- return backend.NewDiskFile(file), e
+ return backend.NewDiskFile(file), nil
}
diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go
index 9e5d8f87d..e1c0b961f 100644
--- a/weed/storage/volume_create_windows.go
+++ b/weed/storage/volume_create_windows.go
@@ -12,17 +12,22 @@ import (
)
func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
-
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
if memoryMapSizeMB > 0 {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
- return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e
+ if e != nil {
+ return nil, e
+ }
+ return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), nil
} else {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
- return backend.NewDiskFile(file), e
+ if e != nil {
+ return nil, e
+ }
+ return backend.NewDiskFile(file), nil
}
}