diff options
| -rw-r--r-- | weed/shell/command_ec_decode.go | 5 | ||||
| -rw-r--r-- | weed/storage/disk_location.go | 65 | ||||
| -rw-r--r-- | weed/storage/disk_location_ec.go | 14 | ||||
| -rw-r--r-- | weed/storage/volume_create.go | 5 | ||||
| -rw-r--r-- | weed/storage/volume_create_linux.go | 5 | ||||
| -rw-r--r-- | weed/storage/volume_create_windows.go | 11 |
6 files changed, 81 insertions, 24 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 4ec1a7e8f..b49ab2c58 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -149,8 +149,9 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits for loc, ecIndexBits := range nodeToEcIndexBits { - if ecIndexBits.ShardIdCount() > maxShardCount { - maxShardCount = ecIndexBits.ShardIdCount() + toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount() + if toBeCopiedShardCount > maxShardCount { + maxShardCount = toBeCopiedShardCount targetNodeLocation = loc exisitngEcIndexBits = ecIndexBits } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 0536f085b..538f7469f 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -71,7 +71,6 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne } else { glog.V(0).Infof("new volume %s error %s", name, e) } - } } } @@ -116,28 +115,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { l.volumesLock.Lock() - for k, v := range l.volumes { - if v.Collection == collection { - e = l.deleteVolumeById(k) - if e != nil { - l.volumesLock.Unlock() - return - } - } - } + delVolsMap := l.unmountVolumeByCollection(collection) l.volumesLock.Unlock() l.ecVolumesLock.Lock() - for k, v := range l.ecVolumes { - if v.Collection == collection { - e = l.deleteEcVolumeById(k) - if e != nil { - l.ecVolumesLock.Unlock() - return + delEcVolsMap := l.unmountEcVolumeByCollection(collection) + l.ecVolumesLock.Unlock() + + errChain := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(2) + go func() { + for _, v := range delVolsMap { + if err := v.Destroy(); err != nil { + errChain <- err } } + wg.Done() + }() + + go func() { + for _, v := range delEcVolsMap { + v.Destroy() + } + wg.Done() + }() + + go func() { + wg.Wait() + close(errChain) + }() + + errBuilder := strings.Builder{} + for err := range errChain { + errBuilder.WriteString(err.Error()) + errBuilder.WriteString("; ") } - l.ecVolumesLock.Unlock() + if errBuilder.Len() > 0 { + e = fmt.Errorf(errBuilder.String()) + } + return } @@ -193,6 +210,20 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { return nil } +func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { + deltaVols := make(map[needle.VolumeId]*Volume, 0) + for k, v := range l.volumes { + if v.Collection == collectionName && !v.isCompacting { + deltaVols[k] = v + } + } + + for k, _ := range deltaVols { + delete(l.volumes, k) + } + return deltaVols +} + func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { l.volumesLock.Lock() defer l.volumesLock.Unlock() diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index ba0824c6d..f6c44e966 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) { delete(l.ecVolumes, vid) return } + +func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume { + deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0) + for k, v := range l.ecVolumes { + if v.Collection == collectionName { + deltaVols[k] = v + } + } + + for k, _ := range deltaVols { + delete(l.ecVolumes, k) + } + return deltaVols +} diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go index ee41c50a9..ffcb246a4 100644 --- a/weed/storage/volume_create.go +++ b/weed/storage/volume_create.go @@ -11,8 +11,11 @@ import ( func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if e != nil { + return nil, e + } if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } - return backend.NewDiskFile(file), e + return backend.NewDiskFile(file), nil } diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go index 5fafbe924..ee599ac32 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/volume_create_linux.go @@ -12,9 +12,12 @@ import ( func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if e != nil { + return nil, e + } if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) } - return backend.NewDiskFile(file), e + return backend.NewDiskFile(file), nil } diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go index 9e5d8f87d..e1c0b961f 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/volume_create_windows.go @@ -12,17 +12,22 @@ import ( ) func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { - if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } if memoryMapSizeMB > 0 { file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) - return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e + if e != nil { + return nil, e + } + return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), nil } else { file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) - return backend.NewDiskFile(file), e + if e != nil { + return nil, e + } + return backend.NewDiskFile(file), nil } } |
