aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-05 23:20:26 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-05 23:20:26 -0700
commitd344e0a035985ce7a28a6f7a4499199ef27aeda3 (patch)
tree77f094232aab9c2e767adb05f81c4e7a9453c2e2
parent450f4733cec38ec31c66b296749e2d0b14e91218 (diff)
downloadseaweedfs-d344e0a035985ce7a28a6f7a4499199ef27aeda3.tar.xz
seaweedfs-d344e0a035985ce7a28a6f7a4499199ef27aeda3.zip
fix ec related bugs
-rw-r--r--weed/server/volume_server_handlers_read.go2
-rw-r--r--weed/shell/command_ec_balance.go25
-rw-r--r--weed/shell/command_ec_common.go9
-rw-r--r--weed/shell/command_ec_rebuild.go2
-rw-r--r--weed/storage/disk_location.go4
-rw-r--r--weed/storage/disk_location_ec.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume.go4
-rw-r--r--weed/storage/store_ec.go2
-rw-r--r--weed/topology/topology.go10
9 files changed, 51 insertions, 9 deletions
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 839e17fbe..9bc436239 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -76,7 +76,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
- glog.V(0).Infof("read %s error: %v", r.URL.Path, err)
+ glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 424b63d9d..4edf94711 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -199,6 +199,7 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
+ deleteEcVolumeShards(ecNode, vid, duplicatedShardIds)
ecNode.freeEcSlot++
}
}
@@ -273,3 +274,27 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
return 0
}
+
+func addEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ for _, shardId := range shardIds{
+ shardInfo.EcIndexBits = uint32(erasure_coding.ShardBits(shardInfo.EcIndexBits).AddShardId(erasure_coding.ShardId(shardId)))
+ }
+ }
+ }
+
+}
+
+func deleteEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, shardIds []uint32){
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ for _, shardId := range shardIds{
+ shardInfo.EcIndexBits = uint32(erasure_coding.ShardBits(shardInfo.EcIndexBits).RemoveShardId(erasure_coding.ShardId(shardId)))
+ }
+ }
+ }
+
+}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 4c53ba43b..041715908 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -35,7 +35,14 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist
}
// ask source node to delete the shard, and maybe the ecx file
- return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ deleteEcVolumeShards(existingLocation, vid, copiedShardIds)
+
+ return nil
}
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 9a5ea3ca9..20929d76c 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -163,6 +163,8 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
return err
}
+ addEcVolumeShards(rebuilder, volumeId, generatedShardIds)
+
return nil
}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 234d1a5f4..e61623fc7 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -223,8 +223,8 @@ func (l *DiskLocation) Close() {
l.Unlock()
l.ecVolumesLock.Lock()
- for _, shards := range l.ecVolumes {
- shards.Close()
+ for _, ecVolume := range l.ecVolumes {
+ ecVolume.Close()
}
l.ecVolumesLock.Unlock()
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 7fb5351a4..ba0824c6d 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -87,8 +87,8 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted {
if len(ecVolume.Shards) == 0 {
delete(l.ecVolumes, vid)
+ ecVolume.Close()
}
- ecVolume.Close()
return true
}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 17e7f2935..aea53f36e 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -155,7 +155,7 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version
// find the needle from ecx file
offset, size, err = ev.findNeedleFromEcx(n.Id)
if err != nil {
- return types.Offset{}, 0, nil, err
+ return types.Offset{}, 0, nil, fmt.Errorf("findNeedleFromEcx: %v", err)
}
shard := ev.Shards[0]
@@ -173,7 +173,7 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off
for l < h {
m := (l + h) / 2
if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
- return types.Offset{}, 0, err
+ return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err)
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 43fb62ba1..408436376 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -118,7 +118,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n, version)
if err != nil {
- return 0, err
+ return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index c6fdd3861..aa01190c9 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -78,7 +78,7 @@ func (t *Topology) Leader() (string, error) {
return l, nil
}
-func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode {
+func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
@@ -91,6 +91,14 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) []*DataNode {
return c.(*Collection).Lookup(vid)
}
}
+
+ if locations, found := t.LookupEcShards(vid); found {
+ for _, loc := range locations.Locations {
+ dataNodes = append(dataNodes, loc...)
+ }
+ return dataNodes
+ }
+
return nil
}