aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/store_ec.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/store_ec.go')
-rw-r--r--weed/storage/store_ec.go32
1 files changed, 17 insertions, 15 deletions
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index a915e1dbd..0126ad9d4 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -25,10 +25,10 @@ import (
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
collectionEcShardSize := make(map[string]int64)
- for _, location := range s.Locations {
+ for diskId, location := range s.Locations {
location.ecVolumesLock.RLock()
for _, ecShards := range location.ecVolumes {
- ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
+ ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage(uint32(diskId))...)
for _, ecShard := range ecShards.Shards {
collectionEcShardSize[ecShards.Collection] += ecShard.Size()
@@ -49,9 +49,9 @@ func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
}
func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) error {
- for _, location := range s.Locations {
+ for diskId, location := range s.Locations {
if ecVolume, err := location.LoadEcShard(collection, vid, shardId); err == nil {
- glog.V(0).Infof("MountEcShards %d.%d", vid, shardId)
+ glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId)
var shardBits erasure_coding.ShardBits
@@ -61,6 +61,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(location.DiskType),
ExpireAtSec: ecVolume.ExpireAtSec,
+ DiskId: uint32(diskId),
}
return nil
} else if err == os.ErrNotExist {
@@ -75,7 +76,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {
- ecShard, found := s.findEcShard(vid, shardId)
+ diskId, ecShard, found := s.findEcShard(vid, shardId)
if !found {
return nil
}
@@ -86,26 +87,27 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
DiskType: string(ecShard.DiskType),
+ DiskId: diskId,
}
- for _, location := range s.Locations {
- if deleted := location.UnloadEcShard(vid, shardId); deleted {
- glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
- s.DeletedEcShardsChan <- message
- return nil
- }
+ location := s.Locations[diskId]
+
+ if deleted := location.UnloadEcShard(vid, shardId); deleted {
+ glog.V(0).Infof("UnmountEcShards %d.%d", vid, shardId)
+ s.DeletedEcShardsChan <- message
+ return nil
}
return fmt.Errorf("UnmountEcShards %d.%d not found on disk", vid, shardId)
}
-func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
- for _, location := range s.Locations {
+func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (diskId uint32, shard *erasure_coding.EcVolumeShard, found bool) {
+ for diskId, location := range s.Locations {
if v, found := location.FindEcShard(vid, shardId); found {
- return v, found
+ return uint32(diskId), v, found
}
}
- return nil, false
+ return 0, nil, false
}
func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {