aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/disk_location_ec.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/disk_location_ec.go')
-rw-r--r--weed/storage/disk_location_ec.go85
1 files changed, 71 insertions, 14 deletions
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 3b2f1ec02..05acbb98b 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -16,26 +16,83 @@ var (
re = regexp.MustCompile("\\.ec[0-9][0-9]")
)
-func (l *DiskLocation) loadEcShards(baseName string, shards []string, collection string, vid needle.VolumeId) (err error){
+func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
+ l.ecShardsLock.RLock()
+ defer l.ecShardsLock.RUnlock()
- for _, shard := range shards{
+ ecShards, ok := l.ecShards[vid]
+ if !ok {
+ return nil, false
+ }
+ for _, ecShard := range ecShards {
+ if ecShard.ShardId == shardId {
+ return ecShard, true
+ }
+ }
+ return nil, false
+}
+
+func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) {
+
+ ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId)
+ if err != nil {
+ return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
+ }
+ l.ecShardsLock.Lock()
+ l.ecShards[vid] = append(l.ecShards[vid], ecVolumeShard)
+ l.ecShardsLock.Unlock()
+
+ return nil
+}
+
+func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool {
+
+ l.ecShardsLock.Lock()
+ defer l.ecShardsLock.Unlock()
+
+ vidShards, found := l.ecShards[vid]
+ if !found {
+ return false
+ }
+ shardIndex := -1
+ for i, shard := range vidShards {
+ if shard.ShardId == shardId {
+ shardIndex = i
+ break
+ }
+ }
+ if shardIndex < 0 {
+ return false
+ }
+
+ if len(vidShards) == 1 {
+ delete(l.ecShards, vid)
+ return true
+ }
+
+ l.ecShards[vid] = append(vidShards[:shardIndex], vidShards[shardIndex+1:]...)
+
+ return true
+}
+
+func (l *DiskLocation) loadEcShards(shards []string, collection string, vid needle.VolumeId) (err error) {
+
+ for _, shard := range shards {
shardId, err := strconv.ParseInt(path.Ext(shard)[3:], 10, 64)
if err != nil {
return fmt.Errorf("failed to parse ec shard name %v: %v", shard, err)
}
- ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, int(shardId))
+
+ err = l.LoadEcShard(collection, vid, erasure_coding.ShardId(shardId))
if err != nil {
- return fmt.Errorf("failed to create ec shard %v: %v", shard, err)
+ return fmt.Errorf("failed to load ec shard %v: %v", shard, err)
}
- l.ecShardsLock.Lock()
- l.ecShards[vid] = append(l.ecShards[vid], ecVolumeShard)
- l.ecShardsLock.Unlock()
}
return nil
}
-func (l *DiskLocation) loadAllEcShards() (err error){
+func (l *DiskLocation) loadAllEcShards() (err error) {
fileInfos, err := ioutil.ReadDir(l.Directory)
if err != nil {
@@ -48,8 +105,8 @@ func (l *DiskLocation) loadAllEcShards() (err error){
var sameVolumeShards []string
var prevVolumeId needle.VolumeId
- for _, fileInfo := range fileInfos{
- if fileInfo.IsDir(){
+ for _, fileInfo := range fileInfos {
+ if fileInfo.IsDir() {
continue
}
ext := path.Ext(fileInfo.Name())
@@ -61,18 +118,18 @@ func (l *DiskLocation) loadAllEcShards() (err error){
continue
}
- if re.MatchString(ext){
+ if re.MatchString(ext) {
if prevVolumeId == 0 || volumeId == prevVolumeId {
sameVolumeShards = append(sameVolumeShards, fileInfo.Name())
- }else{
+ } else {
sameVolumeShards = []string{fileInfo.Name()}
}
prevVolumeId = volumeId
continue
}
- if ext == ".ecx" && volumeId == prevVolumeId{
- if err = l.loadEcShards(baseName, sameVolumeShards, collection, volumeId);err!=nil{
+ if ext == ".ecx" && volumeId == prevVolumeId {
+ if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil {
return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err)
}
prevVolumeId = volumeId