From 228850d58888174d5f0d84c7f0f9506fb4360176 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 24 May 2019 11:52:23 -0700 Subject: shard id starts from zero --- weed/shell/command_ec_encode.go | 78 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 weed/shell/command_ec_encode.go (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go new file mode 100644 index 000000000..80a0ccf5c --- /dev/null +++ b/weed/shell/command_ec_encode.go @@ -0,0 +1,78 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + commands = append(commands, &commandEcEncode{}) +} + +type commandEcEncode struct { +} + +func (c *commandEcEncode) Name() string { + return "ec.encode" +} + +func (c *commandEcEncode) Help() string { + return `apply erasure coding to a volume + + This command will: + 1. freeze one volume + 2. apply erasure coding to the volume + 3. move the encoded shards to multiple volume servers + + The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford + to lose 4 volume servers. + + If the number of volumes are not high, the worst case is that you only have 4 volume servers, + and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server. + + If you only have less than 4 volume servers, with erasure coding, at least you can afford to + have 4 corrupted shard files. + +` +} + +func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { + + encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := encodeCommand.Int("vid", 0, "the volume id") + if err = encodeCommand.Parse(args); err != nil { + return nil + } + + ctx := context.Background() + + locations := commandEnv.masterClient.GetLocations(uint32(*volumeId)) + + if len(locations) == 0 { + return fmt.Errorf("volume %d not found", *volumeId) + } + + err = generateEcSlices(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), locations[0].Url) + + return err +} + +func generateEcSlices(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) error { + + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcGenerateSlices(ctx, &volume_server_pb.VolumeEcGenerateSlicesRequest{ + VolumeId: uint32(volumeId), + }) + return genErr + }) + + return err + +} -- cgit v1.2.3 From f0e6574d5ed03446b9b221653b20618c0e11b381 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 25 May 2019 02:02:44 -0700 Subject: allocate ec shards to volume servers --- weed/shell/command_ec_encode.go | 154 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 80a0ccf5c..4647c2507 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -5,10 +5,15 @@ import ( "flag" "fmt" "io" + "sort" + "sync" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/wdclient" "google.golang.org/grpc" ) @@ -53,18 +58,28 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr ctx := context.Background() + // find volume location locations := commandEnv.masterClient.GetLocations(uint32(*volumeId)) - if len(locations) == 0 { return fmt.Errorf("volume %d not found", *volumeId) } - err = generateEcSlices(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), locations[0].Url) + // generate ec shards + err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), locations[0].Url) + if err != nil { + return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) + } + + // balance the ec shards to current cluster + err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), locations[0]) + if err != nil { + return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) + } return err } -func generateEcSlices(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) error { +func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcGenerateSlices(ctx, &volume_server_pb.VolumeEcGenerateSlicesRequest{ @@ -76,3 +91,136 @@ func generateEcSlices(ctx context.Context, grpcDialOption grpc.DialOption, volum return err } + +func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + + // list all possible locations + var resp *master_pb.VolumeListResponse + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + // find out all volume servers with one volume slot left. + var allDataNodes []*master_pb.DataNodeInfo + var totalFreeEcSlots int + eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { + if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { + allDataNodes = append(allDataNodes, dn) + totalFreeEcSlots += freeEcSlots + } + }) + if totalFreeEcSlots < erasure_coding.TotalShardsCount { + return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots) + } + sort.Slice(allDataNodes, func(i, j int) bool { + return countFreeShardSlots(allDataNodes[j]) < countFreeShardSlots(allDataNodes[i]) + }) + if len(allDataNodes) > erasure_coding.TotalShardsCount { + allDataNodes = allDataNodes[:erasure_coding.TotalShardsCount] + } + + // calculate how many shards to allocate for these servers + allocated := balancedEcDistribution(allDataNodes) + + // ask the data nodes to copy from the source volume server + err = parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, existingLocation) + if err != nil { + return nil + } + + // ask the source volume server to clean up copied ec shards + + // ask the source volume server to delete the original volume + + return err + +} + +func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, + targetServers []*master_pb.DataNodeInfo, allocated []int, + volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + + // parallelize + var wg sync.WaitGroup + startFromShardId := 0 + for i, server := range targetServers { + if allocated[i] <= 0 { + continue + } + + wg.Add(1) + go func(server *master_pb.DataNodeInfo, startFromShardId int, shardCount int) { + defer wg.Done() + copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, existingLocation) + if copyErr != nil { + err = copyErr + } + }(server, startFromShardId, allocated[i]) + startFromShardId += allocated[i] + } + wg.Wait() + + return err +} + +func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, + targetServer *master_pb.DataNodeInfo, startFromShardId int, shardCount int, + volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + + if targetServer.Id == existingLocation.Url { + return nil + } + + for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { + fmt.Printf("copy %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) + } + + return nil +} +func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []int) { + freeSlots := make([]int, len(servers)) + allocated = make([]int, len(servers)) + for i, server := range servers { + freeSlots[i] = countFreeShardSlots(server) + } + allocatedCount := 0 + for allocatedCount < erasure_coding.TotalShardsCount { + for i, _ := range servers { + if freeSlots[i]-allocated[i] > 0 { + allocated[i] += 1 + allocatedCount += 1 + } + if allocatedCount >= erasure_coding.TotalShardsCount { + break + } + } + } + + return allocated +} + +func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { + for _, dc := range topo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + fn(dn) + } + } + } +} + +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { + for _, ecShardInfo := range ecShardInfos { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + count += shardBits.ShardIdCount() + } + return +} + +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { + return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +} -- cgit v1.2.3 From 41e8ae61f80275a2d1ba49f36553f798cf8efe3a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 25 May 2019 14:02:06 -0700 Subject: generate, balance, delete copied shards, delete old volume --- weed/shell/command_ec_encode.go | 109 ++++++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 26 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 4647c2507..c9ec2fcd2 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -51,7 +51,8 @@ func (c *commandEcEncode) Help() string { func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) - volumeId := encodeCommand.Int("vid", 0, "the volume id") + volumeId := encodeCommand.Int("volumeId", 0, "the volume id") + collection := encodeCommand.String("collection", "", "the collection name") if err = encodeCommand.Parse(args); err != nil { return nil } @@ -65,13 +66,13 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), locations[0].Url) + err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), *collection, locations[0].Url) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), locations[0]) + err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), *collection, locations) if err != nil { return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) } @@ -79,11 +80,12 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr return err } -func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) error { +func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcGenerateSlices(ctx, &volume_server_pb.VolumeEcGenerateSlicesRequest{ - VolumeId: uint32(volumeId), + _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: uint32(volumeId), + Collection: collection, }) return genErr }) @@ -92,7 +94,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { +func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { // list all possible locations var resp *master_pb.VolumeListResponse @@ -106,7 +108,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl // find out all volume servers with one volume slot left. var allDataNodes []*master_pb.DataNodeInfo - var totalFreeEcSlots int + var totalFreeEcSlots uint32 eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { allDataNodes = append(allDataNodes, dn) @@ -127,63 +129,118 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl allocated := balancedEcDistribution(allDataNodes) // ask the data nodes to copy from the source volume server - err = parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, existingLocation) + copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, collection, existingLocations[0]) if err != nil { return nil } // ask the source volume server to clean up copied ec shards + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds) + if err != nil { + return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) + } // ask the source volume server to delete the original volume + for _, location := range existingLocations { + err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url) + if err != nil { + return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) + } + } return err } func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*master_pb.DataNodeInfo, allocated []int, - volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + targetServers []*master_pb.DataNodeInfo, allocated []uint32, + volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize + shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup - startFromShardId := 0 + startFromShardId := uint32(0) for i, server := range targetServers { if allocated[i] <= 0 { continue } wg.Add(1) - go func(server *master_pb.DataNodeInfo, startFromShardId int, shardCount int) { + go func(server *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32) { defer wg.Done() - copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, existingLocation) + copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, + startFromShardId, shardCount, volumeId, collection, existingLocation) if copyErr != nil { err = copyErr + } else { + shardIdChan <- copiedShardIds } }(server, startFromShardId, allocated[i]) startFromShardId += allocated[i] } wg.Wait() + close(shardIdChan) - return err + if err != nil { + return nil, err + } + + for shardIds := range shardIdChan { + actuallyCopied = append(actuallyCopied, shardIds...) + } + + return } func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *master_pb.DataNodeInfo, startFromShardId int, shardCount int, - volumeId needle.VolumeId, existingLocation wdclient.Location) (err error) { + targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32, + volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { if targetServer.Id == existingLocation.Url { - return nil + return nil, nil } for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { fmt.Printf("copy %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) + copiedShardIds = append(copiedShardIds, shardId) } - return nil + err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + EcIndexes: copiedShardIds, + SourceDataNode: existingLocation.Url, + }) + return copyErr + }) + + if err != nil { + return + } + + return } -func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []int) { - freeSlots := make([]int, len(servers)) - allocated = make([]int, len(servers)) + +func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, + volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error { + + shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount + + return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: uint32(volumeId), + EcIndexes: toBeDeletedShardIds, + ShouldDeleteEcx: shouldDeleteEcx, + }) + return deleteErr + }) + +} + +func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []uint32) { + freeSlots := make([]uint32, len(servers)) + allocated = make([]uint32, len(servers)) for i, server := range servers { freeSlots[i] = countFreeShardSlots(server) } @@ -213,14 +270,14 @@ func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo) } } -func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count uint32) { for _, ecShardInfo := range ecShardInfos { shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += shardBits.ShardIdCount() + count += uint32(shardBits.ShardIdCount()) } return } -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count uint32) { + return uint32(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) } -- cgit v1.2.3 From db94a41f9e28e620b7527d9cca51f9a052a81184 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 25 May 2019 23:23:19 -0700 Subject: mount/unmount ec shards --- weed/shell/command_ec_encode.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index c9ec2fcd2..e0d028392 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -196,23 +196,36 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { - if targetServer.Id == existingLocation.Url { - return nil, nil - } - for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { - fmt.Printf("copy %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) + fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) copiedShardIds = append(copiedShardIds, shardId) } err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - EcIndexes: copiedShardIds, - SourceDataNode: existingLocation.Url, + + if targetServer.Id != existingLocation.Url { + + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + EcIndexes: copiedShardIds, + SourceDataNode: existingLocation.Url, + }) + if copyErr != nil { + return copyErr + } + } + + _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + EcIndexes: copiedShardIds, }) - return copyErr + if mountErr != nil { + return mountErr + } + + return nil }) if err != nil { -- cgit v1.2.3 From b4b407e4038943ca5b7dc440d2848f23c11b73ca Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 27 May 2019 11:59:03 -0700 Subject: add grpc ec shard read --- weed/shell/command_ec_encode.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e0d028392..499c8a32e 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -208,7 +208,7 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, - EcIndexes: copiedShardIds, + ShardIds: copiedShardIds, SourceDataNode: existingLocation.Url, }) if copyErr != nil { @@ -219,7 +219,7 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, - EcIndexes: copiedShardIds, + ShardIds: copiedShardIds, }) if mountErr != nil { return mountErr @@ -243,7 +243,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), - EcIndexes: toBeDeletedShardIds, + ShardIds: toBeDeletedShardIds, ShouldDeleteEcx: shouldDeleteEcx, }) return deleteErr -- cgit v1.2.3 From 3f9ecee40fd469f9669686752ea8c6b2b8090596 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 28 May 2019 21:29:07 -0700 Subject: working with reading remote intervals --- weed/shell/command_ec_encode.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 499c8a32e..ac42b520d 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -8,6 +8,7 @@ import ( "sort" "sync" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -196,9 +197,10 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { + var shardIdsToCopy []uint32 for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) - copiedShardIds = append(copiedShardIds, shardId) + shardIdsToCopy = append(shardIdsToCopy, shardId) } err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -208,7 +210,7 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: copiedShardIds, + ShardIds: shardIdsToCopy, SourceDataNode: existingLocation.Url, }) if copyErr != nil { @@ -219,12 +221,17 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, - ShardIds: copiedShardIds, + ShardIds: shardIdsToCopy, }) if mountErr != nil { return mountErr } + if targetServer.Id != existingLocation.Url { + copiedShardIds = shardIdsToCopy + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds) + } + return nil }) @@ -243,7 +250,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), - ShardIds: toBeDeletedShardIds, + ShardIds: toBeDeletedShardIds, ShouldDeleteEcx: shouldDeleteEcx, }) return deleteErr -- cgit v1.2.3 From 866197eee3ba3348ab4a1e0cc3f60338f3b9c3a6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 May 2019 01:38:59 -0700 Subject: print out the ec balancing plan --- weed/shell/command_ec_encode.go | 102 +++++++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 39 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index ac42b520d..817529478 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -97,40 +97,24 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { - // list all possible locations - var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) - return err - }) + allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) if err != nil { return err } - // find out all volume servers with one volume slot left. - var allDataNodes []*master_pb.DataNodeInfo - var totalFreeEcSlots uint32 - eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { - if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { - allDataNodes = append(allDataNodes, dn) - totalFreeEcSlots += freeEcSlots - } - }) if totalFreeEcSlots < erasure_coding.TotalShardsCount { return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots) } - sort.Slice(allDataNodes, func(i, j int) bool { - return countFreeShardSlots(allDataNodes[j]) < countFreeShardSlots(allDataNodes[i]) - }) - if len(allDataNodes) > erasure_coding.TotalShardsCount { - allDataNodes = allDataNodes[:erasure_coding.TotalShardsCount] + allocatedDataNodes := allEcNodes + if len(allocatedDataNodes) > erasure_coding.TotalShardsCount { + allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount] } // calculate how many shards to allocate for these servers - allocated := balancedEcDistribution(allDataNodes) + allocated := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allDataNodes, allocated, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0]) if err != nil { return nil } @@ -154,7 +138,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl } func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*master_pb.DataNodeInfo, allocated []uint32, + targetServers []*EcNode, allocated []int, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize @@ -167,7 +151,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia } wg.Add(1) - go func(server *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32) { + go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, collection, existingLocation) @@ -175,9 +159,10 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia err = copyErr } else { shardIdChan <- copiedShardIds + server.freeEcSlot -= len(copiedShardIds) } }(server, startFromShardId, allocated[i]) - startFromShardId += allocated[i] + startFromShardId += uint32(allocated[i]) } wg.Wait() close(shardIdChan) @@ -194,18 +179,18 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia } func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32, + targetServer *EcNode, startFromShardId uint32, shardCount int, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { var shardIdsToCopy []uint32 - for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id) + for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { + fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id) shardIdsToCopy = append(shardIdsToCopy, shardId) } - err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - if targetServer.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation.Url { _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), @@ -227,7 +212,7 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di return mountErr } - if targetServer.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation.Url { copiedShardIds = shardIdsToCopy glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds) } @@ -258,11 +243,11 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt } -func balancedEcDistribution(servers []*master_pb.DataNodeInfo) (allocated []uint32) { - freeSlots := make([]uint32, len(servers)) - allocated = make([]uint32, len(servers)) +func balancedEcDistribution(servers []*EcNode) (allocated []int) { + freeSlots := make([]int, len(servers)) + allocated = make([]int, len(servers)) for i, server := range servers { - freeSlots[i] = countFreeShardSlots(server) + freeSlots[i] = countFreeShardSlots(server.info) } allocatedCount := 0 for allocatedCount < erasure_coding.TotalShardsCount { @@ -290,14 +275,53 @@ func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo) } } -func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count uint32) { +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { for _, ecShardInfo := range ecShardInfos { shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += uint32(shardBits.ShardIdCount()) + count += shardBits.ShardIdCount() } return } -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count uint32) { - return uint32(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { + return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) +} + +type EcNode struct { + info *master_pb.DataNodeInfo + freeEcSlot int +} + +func sortEcNodes(ecNodes []*EcNode) { + sort.Slice(ecNodes, func(i, j int) bool { + return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot + }) +} + +func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + + // list all possible locations + var resp *master_pb.VolumeListResponse + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return nil, 0, err + } + + // find out all volume servers with one slot left. + eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { + if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { + ecNodes = append(ecNodes, &EcNode{ + info: dn, + freeEcSlot: int(freeEcSlots), + }) + totalFreeEcSlots += freeEcSlots + } + }) + + sortEcNodes(ecNodes) + + return } -- cgit v1.2.3 From a72cef3c429202aff5c04ed2c7b9296b2351174f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 31 May 2019 15:48:40 -0700 Subject: encode by collection --- weed/shell/command_ec_encode.go | 66 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 7 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 817529478..e69d1bfc7 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -59,26 +59,47 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } ctx := context.Background() + vid := needle.VolumeId(*volumeId) + // volumeId is provided + if vid != 0 { + return doEcEncode(ctx, commandEnv, *collection, vid) + } + + // apply to all volumes in the collection + volumeIds, err := collectVolumeByCollection(ctx, commandEnv, *collection) + if err != nil { + return err + } + for _, vid := range volumeIds { + if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location - locations := commandEnv.masterClient.GetLocations(uint32(*volumeId)) + locations := commandEnv.masterClient.GetLocations(uint32(vid)) if len(locations) == 0 { - return fmt.Errorf("volume %d not found", *volumeId) + return fmt.Errorf("volume %d not found", vid) } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(*volumeId), *collection, locations[0].Url) + err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { - return fmt.Errorf("generate ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) + return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, needle.VolumeId(*volumeId), *collection, locations) + err = balanceEcShards(ctx, commandEnv, vid, collection, locations) if err != nil { - return fmt.Errorf("balance ec shards for volume %d on %s: %v", *volumeId, locations[0].Url, err) + return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } - return err + return nil } func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { @@ -325,3 +346,34 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN return } + +func collectVolumeByCollection(ctx context.Context, commandEnv *commandEnv, selectedCollection string) (vids []needle.VolumeId, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + vidMap := make(map[uint32]bool) + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for _, v := range dn.VolumeInfos { + if v.Collection == selectedCollection { + vidMap[v.Id] = true + } + } + } + } + } + + for vid, _ := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} -- cgit v1.2.3 From f919d0235cff244941e4e9f8c51bf89224d3d0ab Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 31 May 2019 23:41:17 -0700 Subject: ec encode volumes quiet for a period of time --- weed/shell/command_ec_encode.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e69d1bfc7..7ecd7bb8c 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -7,6 +7,7 @@ import ( "io" "sort" "sync" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -54,6 +55,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := encodeCommand.Int("volumeId", 0, "the volume id") collection := encodeCommand.String("collection", "", "the collection name") + quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") if err = encodeCommand.Parse(args); err != nil { return nil } @@ -67,7 +69,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // apply to all volumes in the collection - volumeIds, err := collectVolumeByCollection(ctx, commandEnv, *collection) + volumeIds, err := collectVolumeForEcEncode(ctx, commandEnv, *collection, *quietPeriod) if err != nil { return err } @@ -347,7 +349,7 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN return } -func collectVolumeByCollection(ctx context.Context, commandEnv *commandEnv, selectedCollection string) (vids []needle.VolumeId, err error) { +func collectVolumeForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { @@ -358,12 +360,15 @@ func collectVolumeByCollection(ctx context.Context, commandEnv *commandEnv, sele return } + quietSeconds := int64((quietPeriod * time.Second).Seconds()) + nowUnixSeconds := time.Now().Unix() + vidMap := make(map[uint32]bool) for _, dc := range resp.TopologyInfo.DataCenterInfos { for _, r := range dc.RackInfos { for _, dn := range r.DataNodeInfos { for _, v := range dn.VolumeInfos { - if v.Collection == selectedCollection { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { vidMap[v.Id] = true } } -- cgit v1.2.3 From ba18314aab2531ff6d7223cb2cf976dc01aaa735 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 1 Jun 2019 01:41:22 -0700 Subject: ec shard delete also check ec volumes, in addition to volumes --- weed/shell/command_ec_encode.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 7ecd7bb8c..14d1ae96b 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -69,7 +69,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // apply to all volumes in the collection - volumeIds, err := collectVolumeForEcEncode(ctx, commandEnv, *collection, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod) if err != nil { return err } @@ -143,7 +143,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) } @@ -177,7 +177,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, - startFromShardId, shardCount, volumeId, collection, existingLocation) + startFromShardId, shardCount, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr } else { @@ -203,23 +203,23 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, targetServer *EcNode, startFromShardId uint32, shardCount int, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { var shardIdsToCopy []uint32 for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id) + fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id) shardIdsToCopy = append(shardIdsToCopy, shardId) } err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - if targetServer.info.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation { _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, - SourceDataNode: existingLocation.Url, + SourceDataNode: existingLocation, }) if copyErr != nil { return copyErr @@ -235,9 +235,9 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di return mountErr } - if targetServer.info.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation { copiedShardIds = shardIdsToCopy - glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds) + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) } return nil @@ -251,11 +251,11 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di } func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error { + volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount - return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), ShardIds: toBeDeletedShardIds, @@ -349,7 +349,7 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN return } -func collectVolumeForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { -- cgit v1.2.3 From 7e80b2b8823a9bb8bac58100a76d6a5825c94be4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Jun 2019 02:26:31 -0700 Subject: fix multiple bugs --- weed/shell/command_ec_encode.go | 161 +++++----------------------------------- 1 file changed, 19 insertions(+), 142 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 14d1ae96b..94265a874 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -5,11 +5,9 @@ import ( "flag" "fmt" "io" - "sort" "sync" "time" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -73,6 +71,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr if err != nil { return err } + fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { return err @@ -96,9 +95,9 @@ func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, } // balance the ec shards to current cluster - err = balanceEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(ctx, commandEnv, vid, collection, locations) if err != nil { - return fmt.Errorf("balance ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + return fmt.Errorf("spread ec shards for volume %d to %s: %v", vid, locations[0].Url, err) } return nil @@ -118,7 +117,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) if err != nil { @@ -139,13 +138,19 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl // ask the data nodes to copy from the source volume server copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0]) if err != nil { - return nil + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + if err != nil { + return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { - return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) + return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume @@ -176,7 +181,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia wg.Add(1) go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, startFromShardId, shardCount, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr @@ -201,81 +206,12 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia return } -func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServer *EcNode, startFromShardId uint32, shardCount int, - volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { - - var shardIdsToCopy []uint32 - for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id) - shardIdsToCopy = append(shardIdsToCopy, shardId) - } - - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - - if targetServer.info.Id != existingLocation { - - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - ShardIds: shardIdsToCopy, - SourceDataNode: existingLocation, - }) - if copyErr != nil { - return copyErr - } - } - - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ - VolumeId: uint32(volumeId), - Collection: collection, - ShardIds: shardIdsToCopy, - }) - if mountErr != nil { - return mountErr - } - - if targetServer.info.Id != existingLocation { - copiedShardIds = shardIdsToCopy - glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) - } - - return nil - }) - - if err != nil { - return - } - - return -} - -func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { - - shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount - - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ - VolumeId: uint32(volumeId), - ShardIds: toBeDeletedShardIds, - ShouldDeleteEcx: shouldDeleteEcx, - }) - return deleteErr - }) - -} - func balancedEcDistribution(servers []*EcNode) (allocated []int) { - freeSlots := make([]int, len(servers)) allocated = make([]int, len(servers)) - for i, server := range servers { - freeSlots[i] = countFreeShardSlots(server.info) - } allocatedCount := 0 for allocatedCount < erasure_coding.TotalShardsCount { - for i, _ := range servers { - if freeSlots[i]-allocated[i] > 0 { + for i, server := range servers { + if server.freeEcSlot-allocated[i] > 0 { allocated[i] += 1 allocatedCount += 1 } @@ -288,67 +224,6 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) { return allocated } -func eachDataNode(topo *master_pb.TopologyInfo, fn func(*master_pb.DataNodeInfo)) { - for _, dc := range topo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dn := range rack.DataNodeInfos { - fn(dn) - } - } - } -} - -func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { - for _, ecShardInfo := range ecShardInfos { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - count += shardBits.ShardIdCount() - } - return -} - -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) -} - -type EcNode struct { - info *master_pb.DataNodeInfo - freeEcSlot int -} - -func sortEcNodes(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot - }) -} - -func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { - - // list all possible locations - var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) - return err - }) - if err != nil { - return nil, 0, err - } - - // find out all volume servers with one slot left. - eachDataNode(resp.TopologyInfo, func(dn *master_pb.DataNodeInfo) { - if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { - ecNodes = append(ecNodes, &EcNode{ - info: dn, - freeEcSlot: int(freeEcSlots), - }) - totalFreeEcSlots += freeEcSlots - } - }) - - sortEcNodes(ecNodes) - - return -} - func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse @@ -360,9 +235,11 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, se return } - quietSeconds := int64((quietPeriod * time.Second).Seconds()) + quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() + fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) + vidMap := make(map[uint32]bool) for _, dc := range resp.TopologyInfo.DataCenterInfos { for _, r := range dc.RackInfos { -- cgit v1.2.3 From d85b41b9048cb336265298e47789838f7a5e597c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 3 Jun 2019 10:38:21 -0700 Subject: fix ec.encode not finding the local ec shards --- weed/shell/command_ec_encode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 94265a874..f84fcf303 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -97,7 +97,7 @@ func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, // balance the ec shards to current cluster err = spreadEcShards(ctx, commandEnv, vid, collection, locations) if err != nil { - return fmt.Errorf("spread ec shards for volume %d to %s: %v", vid, locations[0].Url, err) + return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } return nil -- cgit v1.2.3 From b9e138713c8e2f53cf96132b5ff077ded67c5c20 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Jun 2019 00:13:13 -0700 Subject: ec.encode: add -fullPercent option --- weed/shell/command_ec_encode.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index f84fcf303..e0d4cf380 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -31,6 +31,9 @@ func (c *commandEcEncode) Name() string { func (c *commandEcEncode) Help() string { return `apply erasure coding to a volume + ec.encode [-collection=""] [-fullPercent=95] [-quietFor=1h] + ec.encode [-collection=""] [-volumeId=] + This command will: 1. freeze one volume 2. apply erasure coding to the volume @@ -53,6 +56,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := encodeCommand.Int("volumeId", 0, "the volume id") collection := encodeCommand.String("collection", "", "the collection name") + fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") if err = encodeCommand.Parse(args); err != nil { return nil @@ -67,7 +71,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // apply to all volumes in the collection - volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } @@ -224,7 +228,7 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) { return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { @@ -246,7 +250,9 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, se for _, dn := range r.DataNodeInfos { for _, v := range dn.VolumeInfos { if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { - vidMap[v.Id] = true + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true + } } } } -- cgit v1.2.3 From ede876cfdb0116557dd197a7951957dab6745c24 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Jun 2019 01:30:24 -0700 Subject: periodic scripts exeuction from leader master --- weed/shell/command_ec_encode.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e0d4cf380..8b01f6cfc 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -18,7 +18,7 @@ import ( ) func init() { - commands = append(commands, &commandEcEncode{}) + Commands = append(Commands, &commandEcEncode{}) } type commandEcEncode struct { @@ -51,7 +51,7 @@ func (c *commandEcEncode) Help() string { ` } -func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := encodeCommand.Int("volumeId", 0, "the volume id") @@ -85,9 +85,9 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr return nil } -func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location - locations := commandEnv.masterClient.GetLocations(uint32(vid)) + locations := commandEnv.MasterClient.GetLocations(uint32(vid)) if len(locations) == 0 { return fmt.Errorf("volume %d not found", vid) } @@ -121,7 +121,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum } -func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) if err != nil { @@ -228,10 +228,10 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) { return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) -- cgit v1.2.3 From f9d8bd51ad8cf4597559c3e3e327bbb3432719b7 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 10 Jun 2019 21:32:56 -0700 Subject: ec shard balancing --- weed/shell/command_ec_encode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 8b01f6cfc..686849cb8 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -123,7 +123,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { - allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv) + allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "") if err != nil { return err } @@ -191,7 +191,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia err = copyErr } else { shardIdChan <- copiedShardIds - server.freeEcSlot -= len(copiedShardIds) + server.addEcVolumeShards(volumeId, collection, copiedShardIds) } }(server, startFromShardId, allocated[i]) startFromShardId += uint32(allocated[i]) -- cgit v1.2.3 From 6bc3dee5b37aefc0a04603c6f00670dd6ec2d2ea Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 22 Jun 2019 10:56:54 -0700 Subject: refactoring --- weed/shell/command_ec_encode.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 686849cb8..492dc77eb 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -245,19 +245,15 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, se fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) vidMap := make(map[uint32]bool) - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, r := range dc.RackInfos { - for _, dn := range r.DataNodeInfos { - for _, v := range dn.VolumeInfos { - if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { - if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { - vidMap[v.Id] = true - } - } + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.VolumeInfos { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true } } } - } + }) for vid, _ := range vidMap { vids = append(vids, needle.VolumeId(vid)) -- cgit v1.2.3 From 6883f9e3227ba4b7e3db69d180b15203a8578256 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Jun 2019 23:02:22 -0700 Subject: mark volume readonly before ec encoding --- weed/shell/command_ec_encode.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 492dc77eb..3ae6b9101 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -92,6 +92,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return fmt.Errorf("volume %d not found", vid) } + // mark the volume as readonly + err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + if err != nil { + return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + } + // generate ec shards err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { @@ -107,6 +113,26 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return nil } +func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { + + for _, location := range locations { + + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + return markErr + }) + + if err != nil { + return err + } + + } + + return nil +} + func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { -- cgit v1.2.3 From a3d1296ed9c914dc2ffef87ba7568edfafc6efac Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 27 Jun 2019 12:18:59 -0700 Subject: go fmt --- weed/shell/command_ec_encode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 3ae6b9101..8ad0d51c8 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -119,7 +119,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ - VolumeId: uint32(volumeId), + VolumeId: uint32(volumeId), }) return markErr }) -- cgit v1.2.3 From 8afd8d35b3230f6fc286967e8aa9641bd8c1460c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 28 Jul 2019 03:58:13 -0700 Subject: master: followers can also lookup and redirect improve scalability --- weed/shell/command_ec_encode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 8ad0d51c8..f07cb93f9 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -87,8 +87,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location - locations := commandEnv.MasterClient.GetLocations(uint32(vid)) - if len(locations) == 0 { + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { return fmt.Errorf("volume %d not found", vid) } -- cgit v1.2.3 From 5f6b360eb723ef91d8bb5a556a336e3b54b8a96e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 24 Nov 2019 23:13:40 -0800 Subject: adjust error message --- weed/shell/command_ec_encode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index f07cb93f9..4598b4682 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -95,7 +95,7 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, // mark the volume as readonly err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { - return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } // generate ec shards -- cgit v1.2.3 From dda5c6d3cb32550777f1baf5bd00794e10e56d84 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 22 Dec 2019 04:31:36 -0800 Subject: fmt --- weed/shell/command_ec_encode.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 4598b4682..581d3245f 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -8,13 +8,14 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/wdclient" - "google.golang.org/grpc" ) func init() { @@ -281,7 +282,7 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, se } }) - for vid, _ := range vidMap { + for vid := range vidMap { vids = append(vids, needle.VolumeId(vid)) } -- cgit v1.2.3 From 3ebeae0c0ba94659bb39e055ae0278db5bb825f1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 24 Dec 2019 16:52:21 -0800 Subject: ec encode distribute ec data and parity shards evenly --- weed/shell/command_ec_encode.go | 42 ++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 581d3245f..58527abf2 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -93,6 +93,8 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return fmt.Errorf("volume %d not found", vid) } + // fmt.Printf("found ec %d shards on %v\n", vid, locations) + // mark the volume as readonly err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { @@ -164,10 +166,10 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } // calculate how many shards to allocate for these servers - allocated := balancedEcDistribution(allocatedDataNodes) + allocatedEcIds := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) if err != nil { return err } @@ -197,31 +199,29 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*EcNode, allocated []int, + targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup - startFromShardId := uint32(0) for i, server := range targetServers { - if allocated[i] <= 0 { + if len(allocatedEcIds[i]) <= 0 { continue } wg.Add(1) - go func(server *EcNode, startFromShardId uint32, shardCount int) { + go func(server *EcNode, allocatedEcShardIds []uint32) { defer wg.Done() copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, - startFromShardId, shardCount, volumeId, collection, existingLocation.Url) + allocatedEcShardIds, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr } else { shardIdChan <- copiedShardIds server.addEcVolumeShards(volumeId, collection, copiedShardIds) } - }(server, startFromShardId, allocated[i]) - startFromShardId += uint32(allocated[i]) + }(server, allocatedEcIds[i]) } wg.Wait() close(shardIdChan) @@ -237,18 +237,18 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia return } -func balancedEcDistribution(servers []*EcNode) (allocated []int) { - allocated = make([]int, len(servers)) - allocatedCount := 0 - for allocatedCount < erasure_coding.TotalShardsCount { - for i, server := range servers { - if server.freeEcSlot-allocated[i] > 0 { - allocated[i] += 1 - allocatedCount += 1 - } - if allocatedCount >= erasure_coding.TotalShardsCount { - break - } +func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { + allocated = make([][]uint32, len(servers)) + allocatedShardIdIndex := uint32(0) + serverIndex := 0 + for allocatedShardIdIndex < erasure_coding.TotalShardsCount { + if servers[serverIndex].freeEcSlot > 0 { + allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex) + allocatedShardIdIndex++ + } + serverIndex++ + if serverIndex >= len(servers) { + serverIndex = 0 } } -- cgit v1.2.3 From 72a64a5cf8c2a5adfe59665a746e013ca948e681 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 Jan 2020 14:42:11 -0800 Subject: use the same context object in order to retry --- weed/shell/command_ec_encode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 58527abf2..587b59388 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -120,7 +120,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol for _, location := range locations { - err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) @@ -138,7 +138,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, -- cgit v1.2.3 From 892e726eb9c2427634c46f8ae9b7bcf0b6d1b082 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 21:50:12 -0800 Subject: avoid reusing context object fix https://github.com/chrislusf/seaweedfs/issues/1182 --- weed/shell/command_ec_encode.go | 49 +++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 26 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 587b59388..e22691c00 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -63,22 +63,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // volumeId is provided if vid != 0 { - return doEcEncode(ctx, commandEnv, *collection, vid) + return doEcEncode(commandEnv, *collection, vid) } // apply to all volumes in the collection - volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { + if err = doEcEncode(commandEnv, *collection, vid); err != nil { return err } } @@ -86,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -96,19 +95,19 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, // fmt.Printf("found ec %d shards on %v\n", vid, locations) // mark the volume as readonly - err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) + err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } // balance the ec shards to current cluster - err = spreadEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(context.Background(), commandEnv, vid, collection, locations) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -116,12 +115,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return nil } -func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { +func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { for _, location := range locations { - err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) return markErr @@ -136,10 +135,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol return nil } -func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -152,7 +151,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { - allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "") + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { return err } @@ -169,26 +168,26 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle allocatedEcIds := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume for _, location := range existingLocations { - err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url) + err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) } @@ -198,9 +197,7 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } -func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*EcNode, allocatedEcIds [][]uint32, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { +func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize shardIdChan := make(chan []uint32, len(targetServers)) @@ -213,7 +210,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia wg.Add(1) go func(server *EcNode, allocatedEcShardIds []uint32) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, allocatedEcShardIds, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr @@ -255,11 +252,11 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { -- cgit v1.2.3 From 97ab8a1976f3ba056af8d5b630dcb43006425b51 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 22:23:59 -0800 Subject: remove ctx if possible --- weed/shell/command_ec_encode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e22691c00..6efb05488 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -107,7 +107,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) } // balance the ec shards to current cluster - err = spreadEcShards(context.Background(), commandEnv, vid, collection, locations) + err = spreadEcShards(commandEnv, vid, collection, locations) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -149,7 +149,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, } -func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { -- cgit v1.2.3 From 73564e6a01770316f5ab57e7f4ba8227cedbf1dd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 23 Apr 2020 13:37:31 -0700 Subject: master: add cluster wide lock/unlock operation in weed shell fix https://github.com/chrislusf/seaweedfs/issues/1286 --- weed/shell/command_ec_encode.go | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6efb05488..8f11f8480 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -54,6 +54,10 @@ func (c *commandEcEncode) Help() string { func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) volumeId := encodeCommand.Int("volumeId", 0, "the volume id") collection := encodeCommand.String("collection", "", "the collection name") -- cgit v1.2.3 From 75179544c35575be09f09f32103d5eb4562d28d1 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 17 May 2020 20:20:21 -0700 Subject: logging --- weed/shell/command_ec_encode.go | 1 + 1 file changed, 1 insertion(+) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 8f11f8480..165809d05 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -191,6 +191,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection // ask the source volume server to delete the original volume for _, location := range existingLocations { + fmt.Printf("delete volume %d from %s\n", volumeId, location.Url) err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) -- cgit v1.2.3 From ae1994cbc1692133cc5f4ac30e8060a2b0d9dda8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 18 Jun 2020 09:52:35 -0700 Subject: erasure coding: fix cases where there are no .ecj files --- weed/shell/command_ec_encode.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'weed/shell/command_ec_encode.go') diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 165809d05..5a8146954 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -123,6 +123,8 @@ func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId for _, location := range locations { + fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), @@ -141,6 +143,8 @@ func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { + fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), @@ -204,6 +208,8 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { + fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url) + // parallelize shardIdChan := make(chan []uint32, len(targetServers)) var wg sync.WaitGroup -- cgit v1.2.3