aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-10-27 09:42:23 +0800
committerGitHub <noreply@github.com>2021-10-27 09:42:23 +0800
commit6999325d3631f0e9f4fe4e081650dcf60a1293c5 (patch)
tree99082f145eeee921ae772d0c8601f97f40b7f074 /weed/shell
parentee90edd0e3746ae0f6046dd9e7362aeec821456c (diff)
parent3eeaffeadd9bcff0148d0b1c860ea4ff03e82904 (diff)
downloadseaweedfs-6999325d3631f0e9f4fe4e081650dcf60a1293c5.tar.xz
seaweedfs-6999325d3631f0e9f4fe4e081650dcf60a1293c5.zip
Merge pull request #84 from chrislusf/master
sync
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_ec_decode.go13
-rw-r--r--weed/shell/command_volume_copy.go2
-rw-r--r--weed/shell/command_volume_fix_replication.go16
-rw-r--r--weed/shell/command_volume_move.go28
4 files changed, 51 insertions, 8 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index cfa24cc31..c9f49745b 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -40,6 +41,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
collection := encodeCommand.String("collection", "", "the collection name")
+ forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
if err = encodeCommand.Parse(args); err != nil {
return nil
}
@@ -56,6 +58,17 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return err
}
+ if !*forceChanges {
+ var nodeCount int
+ eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ nodeCount++
+ })
+ if nodeCount < erasure_coding.ParityShardsCount {
+ glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount)
+ return nil
+ }
+ }
+
// volumeId is provided
if vid != 0 {
return doEcDecode(commandEnv, topologyInfo, *collection, vid)
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 8d34acf8f..b4dfbb78a 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -53,6 +53,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, "")
+ _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, "")
return
}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 5d7506c0b..c003eea91 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -255,13 +255,27 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
}
err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ if resp.ProcessedBytes > 0 {
+ fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes)
+ }
+ }
+
return nil
})
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 675f65341..ec71ba2b3 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -78,7 +78,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
+ lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
@@ -101,7 +101,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
return nil
}
-func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
+func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we
@@ -141,15 +141,31 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
}
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
})
- if replicateErr == nil {
- lastAppendAtNs = resp.LastAppendAtNs
+ if replicateErr != nil {
+ return replicateErr
}
- return replicateErr
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ if resp.LastAppendAtNs != 0 {
+ lastAppendAtNs = resp.LastAppendAtNs
+ } else {
+ fmt.Fprintf(writer, "volume %d processed %d bytes\n", volumeId, resp.ProcessedBytes)
+ }
+ }
+
+ return nil
})
return