diff options
Diffstat (limited to 'weed/shell')
22 files changed, 1009 insertions, 263 deletions
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go index 8f5f63b46..02790b9e2 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_bucket_delete.go @@ -49,6 +49,6 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i return fmt.Errorf("read buckets: %v", err) } - return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false) + return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil) } diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 4b3d7f0be..28b9cebbd 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -2,6 +2,7 @@ package shell import ( "context" + "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "io" @@ -21,22 +22,32 @@ func (c *commandCollectionDelete) Name() string { func (c *commandCollectionDelete) Help() string { return `delete specified collection - collection.delete <collection_name> + collection.delete -collectin <collection_name> -force ` } func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - if len(args) == 0 { + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + colDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + collectionName := colDeleteCommand.String("collection", "", "collection to delete") + applyBalancing := colDeleteCommand.Bool("force", false, "apply the collection") + if err = colDeleteCommand.Parse(args); err != nil { return nil } - collectionName := args[0] + if !*applyBalancing { + fmt.Fprintf(writer, "collection %s will be deleted. Use -force to apply the change.\n", *collectionName) + return nil + } err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ - Name: collectionName, + Name: *collectionName, }) return err }) @@ -44,7 +55,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ return } - fmt.Fprintf(writer, "collection %s is deleted.\n", collectionName) + fmt.Fprintf(writer, "collection %s is deleted.\n", *collectionName) return nil } diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 1ddb6a490..bb280b7d9 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -28,7 +28,7 @@ func (c *commandEcBalance) Help() string { Algorithm: - For each type of volume server (different max volume count limit){ + func EcBalance() { for each collection: balanceEcVolumes(collectionName) for each rack: diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 0db119d3c..a808335eb 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -173,6 +173,16 @@ type EcNode struct { freeEcSlot int } +func (ecNode *EcNode) localShardIdCount(vid uint32) int { + for _, ecShardInfo := range ecNode.info.EcShardInfos { + if vid == ecShardInfo.Id { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + return shardBits.ShardIdCount() + } + } + return 0 +} + type EcRack struct { ecNodes map[EcNodeId]*EcNode freeEcSlot int @@ -191,7 +201,15 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes } // find out all volume servers with one slot left. - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(resp.TopologyInfo, selectedDataCenter) + + sortEcNodesByFreeslotsDecending(ecNodes) + + return +} + +func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) { + eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { if selectedDataCenter != "" && selectedDataCenter != dc { return } @@ -205,9 +223,6 @@ func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes }) totalFreeEcSlots += freeEcSlots }) - - sortEcNodesByFreeslotsDecending(ecNodes) - return } @@ -253,6 +268,10 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n }) } +func divide(total, n int) float64 { + return float64(total) / float64(n) +} + func ceilDivide(total, n int) int { return int(math.Ceil(float64(total) / float64(n))) } diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 7177d8ac3..3c5e13663 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -5,7 +5,7 @@ import ( "io" "math" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -52,7 +52,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write return err } - return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + return filer.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) }) diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 96551dd5a..71003714d 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -4,7 +4,7 @@ import ( "fmt" "io" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -70,9 +70,9 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir } } else { fileBlockCount = uint64(len(entry.Chunks)) - fileByteCount = filer2.TotalSize(entry.Chunks) - blockCount += uint64(len(entry.Chunks)) - byteCount += filer2.TotalSize(entry.Chunks) + fileByteCount = filer.FileSize(entry) + blockCount += fileBlockCount + byteCount += fileByteCount } if name != "" && !entry.IsDirectory { diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 36133992f..592ec8be0 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -8,7 +8,7 @@ import ( "strconv" "strings" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -95,7 +95,7 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n", fileMode, len(entry.Chunks), userName, groupName, - filer2.TotalSize(entry.Chunks), dir, entry.Name) + filer.FileSize(entry), dir, entry.Name) } else { fmt.Fprintf(writer, "%s\n", entry.Name) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 0679ec075..a097a3a4e 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -2,7 +2,9 @@ package shell import ( "fmt" + "github.com/golang/protobuf/proto" "io" + "sort" "github.com/golang/protobuf/jsonpb" @@ -54,6 +56,13 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W Indent: " ", } + sort.Slice(respLookupEntry.Entry.Chunks, func(i, j int) bool { + if respLookupEntry.Entry.Chunks[i].Offset == respLookupEntry.Entry.Chunks[j].Offset { + return respLookupEntry.Entry.Chunks[i].Mtime < respLookupEntry.Entry.Chunks[j].Mtime + } + return respLookupEntry.Entry.Chunks[i].Offset < respLookupEntry.Entry.Chunks[j].Offset + }) + text, marshalErr := m.MarshalToString(respLookupEntry.Entry) if marshalErr != nil { return fmt.Errorf("marshal meta: %v", marshalErr) @@ -61,6 +70,11 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W fmt.Fprintf(writer, "%s\n", text) + bytes, _ := proto.Marshal(respLookupEntry.Entry) + gzippedBytes, _ := util.GzipData(bytes) + zstdBytes, _ := util.ZstdData(bytes) + fmt.Fprintf(writer, "chunks %d meta size: %d gzip:%d zstd:%d\n", len(respLookupEntry.Entry.Chunks), len(bytes), len(gzippedBytes), len(zstdBytes)) + return nil }) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 69e3c7fd9..53222ca29 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" "os" "sort" @@ -39,14 +40,15 @@ func (c *commandVolumeBalance) Help() string { } func balanceWritableVolumes(){ - idealWritableVolumes = totalWritableVolumes / numVolumeServers + idealWritableVolumeRatio = totalWritableVolumes / totalNumberOfMaxVolumes for hasMovedOneVolume { - sort all volume servers ordered by the number of local writable volumes - pick the volume server A with the lowest number of writable volumes x - pick the volume server B with the highest number of writable volumes y - if y > idealWritableVolumes and x +1 <= idealWritableVolumes { - if B has a writable volume id v that A does not have { - move writable volume v from A to B + sort all volume servers ordered by the localWritableVolumeRatio = localWritableVolumes to localVolumeMax + pick the volume server B with the highest localWritableVolumeRatio y + for any the volume server A with the number of writable volumes x + 1 <= idealWritableVolumeRatio * localVolumeMax { + if y > localWritableVolumeRatio { + if B has a writable volume id v that A does not have, and satisfy v replication requirements { + move writable volume v from A to B + } } } } @@ -81,38 +83,33 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } - typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc) + volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc) + volumeReplicas, _ := collectVolumeReplicaLocations(resp) - for maxVolumeCount, volumeServers := range typeToNodes { - if len(volumeServers) < 2 { - fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount) - continue + if *collection == "EACH_COLLECTION" { + collections, err := ListCollectionNames(commandEnv, true, false) + if err != nil { + return err } - if *collection == "EACH_COLLECTION" { - collections, err := ListCollectionNames(commandEnv, true, false) - if err != nil { - return err - } - for _, c := range collections { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { - return err - } - } - } else if *collection == "ALL_COLLECTIONS" { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { - return err - } - } else { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + for _, c := range collections { + if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { return err } } - + } else if *collection == "ALL_COLLECTIONS" { + if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { + return err + } + } else { + if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + return err + } } + return nil } -func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { @@ -125,7 +122,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit return !v.ReadOnly && v.Size < volumeSizeLimit }) } - if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil { return err } @@ -140,22 +137,21 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit return v.ReadOnly || v.Size >= volumeSizeLimit }) } - if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { return err } return nil } -func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) { - typeToNodes = make(map[uint64][]*Node) +func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter string) (nodes []*Node) { for _, dc := range t.DataCenterInfos { if selectedDataCenter != "" && dc.Id != selectedDataCenter { continue } for _, r := range dc.RackInfos { for _, dn := range r.DataNodeInfos { - typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{ + nodes = append(nodes, &Node{ info: dn, dc: dc.Id, rack: r.Id, @@ -173,6 +169,23 @@ type Node struct { rack string } +func (n *Node) localVolumeRatio() float64 { + return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount)) +} + +func (n *Node) localVolumeNextRatio() float64 { + return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount)) +} + +func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { + n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) + for _, v := range n.info.VolumeInfos { + if fn(v) { + n.selectedVolumes[v.Id] = v + } + } +} + func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { sort.Slice(volumes, func(i, j int) bool { return volumes[i].Size < volumes[j].Size @@ -185,73 +198,146 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error { - selectedVolumeCount := 0 +func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { + selectedVolumeCount, volumeMaxCount := 0, 0 for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) + volumeMaxCount += int(dn.info.MaxVolumeCount) } - idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes)) + idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount) - hasMove := true + hasMoved := true - for hasMove { - hasMove = false + for hasMoved { + hasMoved = false sort.Slice(nodes, func(i, j int) bool { - // TODO sort by free volume slots??? - return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes) + return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio() }) - emptyNode, fullNode := nodes[0], nodes[len(nodes)-1] - if len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes { - // sort the volumes to move - var candidateVolumes []*master_pb.VolumeInformationMessage - for _, v := range fullNode.selectedVolumes { - candidateVolumes = append(candidateVolumes, v) + fullNode := nodes[len(nodes)-1] + var candidateVolumes []*master_pb.VolumeInformationMessage + for _, v := range fullNode.selectedVolumes { + candidateVolumes = append(candidateVolumes, v) + } + sortCandidatesFn(candidateVolumes) + + for i := 0; i < len(nodes)-1; i++ { + emptyNode := nodes[i] + if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) { + // no more volume servers with empty slots + break } - sortCandidatesFn(candidateVolumes) - - for _, v := range candidateVolumes { - if v.ReplicaPlacement > 0 { - if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack { - // TODO this logic is too simple, but should work most of the time - // Need a correct algorithm to handle all different cases - continue - } - } - if _, found := emptyNode.selectedVolumes[v.Id]; !found { - if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil { - delete(fullNode.selectedVolumes, v.Id) - emptyNode.selectedVolumes[v.Id] = v - hasMove = true - break - } else { - return err - } - } + hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing) + if err != nil { + return + } + if hasMoved { + // moved one volume + break } } } return nil } -func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error { +func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) { + + for _, v := range candidateVolumes { + hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, fullNode, v, emptyNode, applyBalancing) + if err != nil { + return + } + if hasMoved { + break + } + } + return +} + +func maybeMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolume *master_pb.VolumeInformationMessage, emptyNode *Node, applyChange bool) (hasMoved bool, err error) { + + if candidateVolume.ReplicaPlacement > 0 { + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(candidateVolume.ReplicaPlacement)) + if !isGoodMove(replicaPlacement, volumeReplicas[candidateVolume.Id], fullNode, emptyNode) { + return false, nil + } + } + if _, found := emptyNode.selectedVolumes[candidateVolume.Id]; !found { + if err = moveVolume(commandEnv, candidateVolume, fullNode, emptyNode, applyChange); err == nil { + adjustAfterMove(candidateVolume, volumeReplicas, fullNode, emptyNode) + return true, nil + } else { + return + } + } + return +} + +func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyChange bool) error { collectionPrefix := v.Collection + "_" if v.Collection == "" { collectionPrefix = "" } fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) - if applyBalancing { + if applyChange { return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) } return nil } -func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { - node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) - for _, v := range node.info.VolumeInfos { - if fn(v) { - node.selectedVolumes[v.Id] = v +func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool { + for _, replica := range existingReplicas { + if replica.location.dataNode.Id == targetNode.info.Id && + replica.location.rack == targetNode.rack && + replica.location.dc == targetNode.dc { + // never move to existing nodes + return false + } + } + dcs, racks := make(map[string]bool), make(map[string]int) + for _, replica := range existingReplicas { + if replica.location.dataNode.Id != sourceNode.info.Id { + dcs[replica.location.DataCenter()] = true + racks[replica.location.Rack()]++ + } + } + + dcs[targetNode.dc] = true + racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++ + + if len(dcs) > placement.DiffDataCenterCount+1 { + return false + } + + if len(racks) > placement.DiffRackCount+placement.DiffDataCenterCount+1 { + return false + } + + for _, sameRackCount := range racks { + if sameRackCount > placement.SameRackCount+1 { + return false + } + } + + return true + +} + +func adjustAfterMove(v *master_pb.VolumeInformationMessage, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, emptyNode *Node) { + delete(fullNode.selectedVolumes, v.Id) + if emptyNode.selectedVolumes != nil { + emptyNode.selectedVolumes[v.Id] = v + } + existingReplicas := volumeReplicas[v.Id] + for _, replica := range existingReplicas { + if replica.location.dataNode.Id == fullNode.info.Id && + replica.location.rack == fullNode.rack && + replica.location.dc == fullNode.dc { + replica.location.dc = emptyNode.dc + replica.location.rack = emptyNode.rack + replica.location.dataNode = emptyNode.info + return } } } diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go new file mode 100644 index 000000000..9e154dc00 --- /dev/null +++ b/weed/shell/command_volume_balance_test.go @@ -0,0 +1,155 @@ +package shell + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +type testMoveCase struct { + name string + replication string + replicas []*VolumeReplica + sourceLocation location + targetLocation location + expected bool +} + +func TestIsGoodMove(t *testing.T) { + + var tests = []testMoveCase{ + + { + name: "test 100 move to spread into proper data centers", + replication: "100", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + + { + name: "test move to the same node", + replication: "001", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + expected: false, + }, + + { + name: "test move to the same rack, but existing node", + replication: "001", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + expected: false, + }, + + { + name: "test move to the same rack, a new node", + replication: "001", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + + { + name: "test 010 move all to the same rack", + replication: "010", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: false, + }, + + { + name: "test 010 move to spread racks", + replication: "010", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + + { + name: "test 010 move to spread racks", + replication: "010", + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + }, + sourceLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + targetLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + } + + for _, tt := range tests { + replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication) + println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name) + sourceNode := &Node{ + info: tt.sourceLocation.dataNode, + dc: tt.sourceLocation.dc, + rack: tt.sourceLocation.rack, + } + targetNode := &Node{ + info: tt.targetLocation.dataNode, + dc: tt.targetLocation.dc, + rack: tt.targetLocation.rack, + } + if isGoodMove(replicaPlacement, tt.replicas, sourceNode, targetNode) != tt.expected { + t.Errorf("%s: expect %v move from %v to %s, replication:%v", + tt.name, tt.expected, tt.sourceLocation, tt.targetLocation, tt.replication) + } + } + +} diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index ff976c345..539bdb515 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -28,7 +28,7 @@ func (c *commandVolumeConfigureReplication) Name() string { func (c *commandVolumeConfigureReplication) Help() string { return `change volume replication value - This command changes a volume replication value. It should be followed by volume.fix.replication. + This command changes a volume replication value. It should be followed by "volume.fix.replication". ` } diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index cdd10863f..f9edf9431 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -1,6 +1,7 @@ package shell import ( + "flag" "fmt" "io" @@ -21,7 +22,7 @@ func (c *commandVolumeCopy) Name() string { func (c *commandVolumeCopy) Help() string { return `copy a volume from one volume server to another volume server - volume.copy <source volume server host:port> <target volume server host:port> <volume id> + volume.copy -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> This command copies a volume from one volume server to another volume server. Usually you will want to unmount the volume first before copying. @@ -35,16 +36,17 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io. return } - if len(args) != 3 { - fmt.Fprintf(writer, "received args: %+v\n", args) - return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>") + volCopyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volCopyCommand.Int("volumeId", 0, "the volume id") + sourceNodeStr := volCopyCommand.String("source", "", "the source volume server <host>:<port>") + targetNodeStr := volCopyCommand.String("target", "", "the target volume server <host>:<port>") + if err = volCopyCommand.Parse(args); err != nil { + return nil } - sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2] - volumeId, err := needle.NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) - } + sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr + + volumeId := needle.VolumeId(*volumeIdInt) if sourceVolumeServer == targetVolumeServer { return fmt.Errorf("source and target volume servers are the same!") diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go index c5cc9e277..187caa1a4 100644 --- a/weed/shell/command_volume_delete.go +++ b/weed/shell/command_volume_delete.go @@ -1,7 +1,7 @@ package shell import ( - "fmt" + "flag" "io" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -21,7 +21,7 @@ func (c *commandVolumeDelete) Name() string { func (c *commandVolumeDelete) Help() string { return `delete a live volume from one volume server - volume.delete <volume server host:port> <volume id> + volume.delete -node <volume server host:port> -volumeId <volume id> This command deletes a volume from one volume server. @@ -34,16 +34,16 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i return } - if len(args) != 2 { - fmt.Fprintf(writer, "received args: %+v\n", args) - return fmt.Errorf("need 2 args of <volume server host:port> <volume id>") + volDeleteCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volDeleteCommand.Int("volumeId", 0, "the volume id") + nodeStr := volDeleteCommand.String("node", "", "the volume server <host>:<port>") + if err = volDeleteCommand.Parse(args); err != nil { + return nil } - sourceVolumeServer, volumeIdString := args[0], args[1] - volumeId, err := needle.NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) - } + sourceVolumeServer := *nodeStr + + volumeId := needle.VolumeId(*volumeIdInt) return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 6b5e4e735..471b24a2a 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -2,9 +2,10 @@ package shell import ( "context" + "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" - "math/rand" "sort" "github.com/chrislusf/seaweedfs/weed/operation" @@ -27,16 +28,18 @@ func (c *commandVolumeFixReplication) Name() string { func (c *commandVolumeFixReplication) Help() string { return `add replicas to volumes that are missing replicas - This command file all under-replicated volumes, and find volume servers with free slots. + This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop. + + This command also finds all under-replicated volumes, and finds volume servers with free slots. If the free slots satisfy the replication requirement, the volume content is copied over and mounted. volume.fix.replication -n # do not take action - volume.fix.replication # actually copying the volume files and mount the volume + volume.fix.replication # actually deleting or copying the volume files and mount the volume Note: * each time this will only add back one replica for one volume id. If there are multiple replicas are missing, e.g. multiple volume servers are new, you may need to run this multiple times. - * do not run this too quick within seconds, since the new volume replica may take a few seconds + * do not run this too quickly within seconds, since the new volume replica may take a few seconds to register itself to the master. ` @@ -48,11 +51,14 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, return } - takeAction := true - if len(args) > 0 && args[0] == "-n" { - takeAction = false + volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") + if err = volFixReplicationCommand.Parse(args); err != nil { + return nil } + takeAction := !*skipChange + var resp *master_pb.VolumeListResponse err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) @@ -64,53 +70,89 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all volumes that needs replication // collect all data nodes - replicatedVolumeLocations := make(map[uint32][]location) - replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) + volumeReplicas, allLocations := collectVolumeReplicaLocations(resp) + + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") + } + + // find all under replicated volumes + var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[0] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + } + } + + if len(overReplicatedVolumeIds) > 0 { + return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) + } + + if len(underReplicatedVolumeIds) == 0 { + return nil + } + + // find the most under populated data nodes + keepDataNodesSorted(allLocations) + + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) + +} + +func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) { + volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, v := range dn.VolumeInfos { - if v.ReplicaPlacement > 0 { - replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) - replicatedVolumeInfo[v.Id] = v - } + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) } allLocations = append(allLocations, loc) }) + return volumeReplicas, allLocations +} - // find all under replicated volumes - underReplicatedVolumeLocations := make(map[uint32][]location) - for vid, locations := range replicatedVolumeLocations { - volumeInfo := replicatedVolumeInfo[vid] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(locations) { - underReplicatedVolumeLocations[vid] = locations +func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range overReplicatedVolumeIds { + replicas := volumeReplicas[vid] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) + + replica := pickOneReplicaToDelete(replicas, replicaPlacement) + + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) + + if !takeAction { + break } - } - if len(underReplicatedVolumeLocations) == 0 { - return fmt.Errorf("no under replicated volumes") - } + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil { + return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) + } - if len(allLocations) == 0 { - return fmt.Errorf("no data nodes at all") } + return nil +} - // find the most under populated data nodes - keepDataNodesSorted(allLocations) - - for vid, locations := range underReplicatedVolumeLocations { - volumeInfo := replicatedVolumeInfo[vid] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range underReplicatedVolumeIds { + replicas := volumeReplicas[vid] + replica := pickOneReplicaToCopyFrom(replicas) + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false for _, dst := range allLocations { // check whether data nodes satisfy the constraints - if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { + if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // ask the volume server to replicate the volume - sourceNodes := underReplicatedVolumeLocations[vid] - sourceNode := sourceNodes[rand.Intn(len(sourceNodes))] foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) if !takeAction { break @@ -118,11 +160,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: volumeInfo.Id, - SourceDataNode: sourceNode.dataNode.Id, + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, }) if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } return nil }) @@ -138,11 +180,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } } if !foundNewLocation { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } } - return nil } @@ -182,22 +223,15 @@ func keepDataNodesSorted(dataNodes []location) { return false } */ -func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { +func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool { - existingDataNodes := make(map[string]int) - for _, loc := range existingLocations { - existingDataNodes[loc.String()] += 1 - } - sameDataNodeCount := existingDataNodes[possibleLocation.String()] - // avoid duplicated volume on the same data node - if sameDataNodeCount > 0 { + existingDataCenters, _, existingDataNodes := countReplicas(replicas) + + if _, found := existingDataNodes[possibleLocation.String()]; found { + // avoid duplicated volume on the same data node return false } - existingDataCenters := make(map[string]int) - for _, loc := range existingLocations { - existingDataCenters[loc.DataCenter()] += 1 - } primaryDataCenters, _ := findTopKeys(existingDataCenters) // ensure data center count is within limit @@ -218,20 +252,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi } // now this is one of the primary dcs - existingRacks := make(map[string]int) - for _, loc := range existingLocations { - if loc.DataCenter() != possibleLocation.DataCenter() { + primaryDcRacks := make(map[string]int) + for _, replica := range replicas { + if replica.location.DataCenter() != possibleLocation.DataCenter() { continue } - existingRacks[loc.Rack()] += 1 + primaryDcRacks[replica.location.Rack()] += 1 } - primaryRacks, _ := findTopKeys(existingRacks) - sameRackCount := existingRacks[possibleLocation.Rack()] + primaryRacks, _ := findTopKeys(primaryDcRacks) + sameRackCount := primaryDcRacks[possibleLocation.Rack()] // ensure rack count is within limit - if _, found := existingRacks[possibleLocation.Rack()]; !found { + if _, found := primaryDcRacks[possibleLocation.Rack()]; !found { // different from existing racks - if len(existingRacks) < replicaPlacement.DiffRackCount+1 { + if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 { // lack on different racks return true } else { @@ -280,6 +314,11 @@ func isAmong(key string, keys []string) bool { return false } +type VolumeReplica struct { + location *location + info *master_pb.VolumeInformationMessage +} + type location struct { dc string rack string @@ -305,3 +344,43 @@ func (l location) Rack() string { func (l location) DataCenter() string { return l.dc } + +func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica { + mostRecent := replicas[0] + for _, replica := range replicas { + if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond { + mostRecent = replica + } + } + return mostRecent +} + +func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) { + diffDc = make(map[string]int) + diffRack = make(map[string]int) + diffNode = make(map[string]int) + for _, replica := range replicas { + diffDc[replica.location.DataCenter()] += 1 + diffRack[replica.location.Rack()] += 1 + diffNode[replica.location.String()] += 1 + } + return +} + +func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica { + + allSame := true + oldest := replicas[0] + for _, replica := range replicas { + if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond { + oldest = replica + allSame = false + } + } + if !allSame { + return oldest + } + + // TODO what if all the replicas have the same timestamp? + return oldest +} diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go index 4cfbd96aa..bb61be1ef 100644 --- a/weed/shell/command_volume_fix_replication_test.go +++ b/weed/shell/command_volume_fix_replication_test.go @@ -8,11 +8,11 @@ import ( ) type testcase struct { - name string - replication string - existingLocations []location - possibleLocation location - expected bool + name string + replication string + replicas []*VolumeReplica + possibleLocation location + expected bool } func TestSatisfyReplicaPlacementComplicated(t *testing.T) { @@ -21,8 +21,10 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 100 negative", replication: "100", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, }, possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, expected: false, @@ -30,8 +32,10 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 100 positive", replication: "100", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, }, possibleLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, expected: true, @@ -39,10 +43,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 022 positive", replication: "022", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, - {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}}, expected: true, @@ -50,10 +60,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 022 negative", replication: "022", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, - {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, }, possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}}, expected: false, @@ -61,10 +77,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 210 moved from 200 positive", replication: "210", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, - {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, }, possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}}, expected: true, @@ -72,10 +94,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 210 moved from 200 negative extra dc", replication: "210", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, - {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, }, possibleLocation: location{"dc4", "r4", &master_pb.DataNodeInfo{Id: "dn4"}}, expected: false, @@ -83,10 +111,16 @@ func TestSatisfyReplicaPlacementComplicated(t *testing.T) { { name: "test 210 moved from 200 negative extra data node", replication: "210", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, - {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}}, expected: false, @@ -103,9 +137,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) { { name: "test 011 same existing rack", replication: "011", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, }, possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, expected: true, @@ -113,9 +151,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) { { name: "test 011 negative", replication: "011", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, expected: false, @@ -123,9 +165,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) { { name: "test 011 different existing racks", replication: "011", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, }, possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, expected: true, @@ -133,9 +179,13 @@ func TestSatisfyReplicaPlacement01x(t *testing.T) { { name: "test 011 different existing racks negative", replication: "011", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, }, possibleLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, expected: false, @@ -152,8 +202,10 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) { { name: "test 001", replication: "001", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, expected: true, @@ -161,9 +213,13 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) { { name: "test 002 positive", replication: "002", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, expected: true, @@ -171,9 +227,13 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) { { name: "test 002 negative, repeat the same node", replication: "002", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, expected: false, @@ -181,10 +241,16 @@ func TestSatisfyReplicaPlacement00x(t *testing.T) { { name: "test 002 negative, enough node already", replication: "002", - existingLocations: []location{ - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, - {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + replicas: []*VolumeReplica{ + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + { + location: &location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, }, possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}}, expected: false, @@ -199,9 +265,9 @@ func runTests(tests []testcase, t *testing.T) { for _, tt := range tests { replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication) println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name) - if satisfyReplicaPlacement(replicaPlacement, tt.existingLocations, tt.possibleLocation) != tt.expected { + if satisfyReplicaPlacement(replicaPlacement, tt.replicas, tt.possibleLocation) != tt.expected { t.Errorf("%s: expect %v add %v to %s %+v", - tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.existingLocations) + tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.replicas) } } } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index cf5ad6d6d..4b3568acb 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -11,7 +11,7 @@ import ( "path/filepath" "sync" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -197,7 +197,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer files[i.vid].Write(buffer) } }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { - dChunks, mChunks, resolveErr := filer2.ResolveChunkManifest(filer2.LookupFn(c.env), entry.Entry.Chunks) + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) if resolveErr != nil { return nil } diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index ded7b7e66..bd588d0b5 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -2,7 +2,7 @@ package shell import ( "context" - "fmt" + "flag" "io" "github.com/chrislusf/seaweedfs/weed/operation" @@ -25,7 +25,7 @@ func (c *commandVolumeMount) Name() string { func (c *commandVolumeMount) Help() string { return `mount a volume from one volume server - volume.mount <volume server host:port> <volume id> + volume.mount -node <volume server host:port> -volumeId <volume id> This command mounts a volume from one volume server. @@ -38,16 +38,16 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io return } - if len(args) != 2 { - fmt.Fprintf(writer, "received args: %+v\n", args) - return fmt.Errorf("need 2 args of <volume server host:port> <volume id>") + volMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volMountCommand.Int("volumeId", 0, "the volume id") + nodeStr := volMountCommand.String("node", "", "the volume server <host>:<port>") + if err = volMountCommand.Parse(args); err != nil { + return nil } - sourceVolumeServer, volumeIdString := args[0], args[1] - volumeId, err := needle.NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) - } + sourceVolumeServer := *nodeStr + + volumeId := needle.VolumeId(*volumeIdInt) return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 392b947e7..b136604e5 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -2,6 +2,7 @@ package shell import ( "context" + "flag" "fmt" "io" "log" @@ -27,7 +28,7 @@ func (c *commandVolumeMove) Name() string { func (c *commandVolumeMove) Help() string { return `move a live volume from one volume server to another volume server - volume.move <source volume server host:port> <target volume server host:port> <volume id> + volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> This command move a live volume from one volume server to another volume server. Here are the steps: @@ -48,16 +49,17 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. return } - if len(args) != 3 { - fmt.Fprintf(writer, "received args: %+v\n", args) - return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>") + volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id") + sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>") + targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>") + if err = volMoveCommand.Parse(args); err != nil { + return nil } - sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2] - volumeId, err := needle.NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) - } + sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr + + volumeId := needle.VolumeId(*volumeIdInt) if sourceVolumeServer == targetVolumeServer { return fmt.Errorf("source and target volume servers are the same!") @@ -91,6 +93,43 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { + // check to see if the volume is already read-only and if its not then we need + // to mark it as read-only and then before we return we need to undo what we + // did + var shouldMarkWritable bool + defer func() { + if !shouldMarkWritable { + return + } + + clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ + VolumeId: uint32(volumeId), + }) + return writableErr + }) + if clientErr != nil { + log.Printf("failed to mark volume %d as writable after copy from %s: %v", volumeId, sourceVolumeServer, clientErr) + } + }() + + err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(volumeId), + }) + if statusErr == nil && !resp.IsReadOnly { + shouldMarkWritable = true + _, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + return readonlyErr + } + return statusErr + }) + if err != nil { + return + } + err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go new file mode 100644 index 000000000..214783ee1 --- /dev/null +++ b/weed/shell/command_volume_server_evacuate.go @@ -0,0 +1,208 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "io" + "sort" +) + +func init() { + Commands = append(Commands, &commandVolumeServerEvacuate{}) +} + +type commandVolumeServerEvacuate struct { +} + +func (c *commandVolumeServerEvacuate) Name() string { + return "volumeServer.evacuate" +} + +func (c *commandVolumeServerEvacuate) Help() string { + return `move out all data on a volume server + + volumeServer.evacuate -node <host:port> + + This command moves all data away from the volume server. + The volumes on the volume servers will be redistributed. + + Usually this is used to prepare to shutdown or upgrade the volume server. + + Sometimes a volume can not be moved because there are no + good destination to meet the replication requirement. + E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved. + You can use "-skipNonMoveable" to move the rest volumes. + +` +} + +func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server") + skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") + applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") + if err = vsEvacuateCommand.Parse(args); err != nil { + return nil + } + + if *volumeServer == "" { + return fmt.Errorf("need to specify volume server by -node=<host>:<port>") + } + + return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer) + +} + +func volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) { + // 1. confirm the volume server is part of the cluster + // 2. collect all other volume servers, sort by empty slots + // 3. move to any other volume server as long as it satisfy the replication requirements + + // list all the volumes + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + if err := evacuateNormalVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + return err + } + + if err := evacuateEcVolumes(commandEnv, resp, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + return err + } + + return nil +} + +func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { + // find this volume server + volumeServers := collectVolumeServersByDc(resp.TopologyInfo, "") + thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer) + if thisNode == nil { + return fmt.Errorf("%s is not found in this cluster", volumeServer) + } + + // move away normal volumes + volumeReplicas, _ := collectVolumeReplicaLocations(resp) + for _, vol := range thisNode.info.VolumeInfos { + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + if err != nil { + return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) + fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) + } else { + return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + } + } + } + return nil +} + +func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListResponse, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { + // find this ec volume server + ecNodes, _ := collectEcVolumeServersByDc(resp.TopologyInfo, "") + thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer) + if thisNode == nil { + return fmt.Errorf("%s is not found in this cluster\n", volumeServer) + } + + // move away ec volumes + for _, ecShardInfo := range thisNode.info.EcShardInfos { + hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) + if err != nil { + return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) + } else { + return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + } + } + } + return nil +} + +func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) { + + for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { + + sort.Slice(otherNodes, func(i, j int) bool { + return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id) + }) + + for i := 0; i < len(otherNodes); i++ { + emptyNode := otherNodes[i] + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange) + if err != nil { + return + } else { + hasMoved = true + break + } + } + if !hasMoved { + return + } + } + + return +} + +func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { + sort.Slice(otherNodes, func(i, j int) bool { + return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio() + }) + + for i := 0; i < len(otherNodes); i++ { + emptyNode := otherNodes[i] + hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange) + if err != nil { + return + } + if hasMoved { + break + } + } + return +} + +func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) { + for _, node := range volumeServers { + if node.info.Id == thisServer { + thisNode = node + continue + } + otherNodes = append(otherNodes, node) + } + return +} + +func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) { + for _, node := range volumeServers { + if node.info.Id == thisServer { + thisNode = node + continue + } + otherNodes = append(otherNodes, node) + } + return +} diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go new file mode 100644 index 000000000..2a2e56e86 --- /dev/null +++ b/weed/shell/command_volume_server_leave.go @@ -0,0 +1,67 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "google.golang.org/grpc" + "io" +) + +func init() { + Commands = append(Commands, &commandVolumeServerLeave{}) +} + +type commandVolumeServerLeave struct { +} + +func (c *commandVolumeServerLeave) Name() string { + return "volumeServer.leave" +} + +func (c *commandVolumeServerLeave) Help() string { + return `stop a volume server from sending heartbeats to the master + + volume.unmount -node <volume server host:port> -force + + This command enables gracefully shutting down the volume server. + The volume server will stop sending heartbeats to the master. + After draining the traffic for a few seconds, you can safely shut down the volume server. + + This operation is not revocable unless the volume server is restarted. +` +} + +func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeServer := vsLeaveCommand.String("node", "", "<host>:<port> of the volume server") + if err = vsLeaveCommand.Parse(args); err != nil { + return nil + } + + if *volumeServer == "" { + return fmt.Errorf("need to specify volume server by -node=<host>:<port>") + } + + return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer) + +} + +func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) { + return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{}) + if leaveErr != nil { + fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr) + } else { + fmt.Fprintf(writer, "stopped heartbeat in volume server %s. After a few seconds to drain traffic, it will be safe to stop the volume server.\n", volumeServer) + } + return leaveErr + }) +} diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index 7596bb4c8..f7e5a501b 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -2,7 +2,7 @@ package shell import ( "context" - "fmt" + "flag" "io" "github.com/chrislusf/seaweedfs/weed/operation" @@ -25,7 +25,7 @@ func (c *commandVolumeUnmount) Name() string { func (c *commandVolumeUnmount) Help() string { return `unmount a volume from one volume server - volume.unmount <volume server host:port> <volume id> + volume.unmount -node <volume server host:port> -volumeId <volume id> This command unmounts a volume from one volume server. @@ -38,16 +38,16 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer return } - if len(args) != 2 { - fmt.Fprintf(writer, "received args: %+v\n", args) - return fmt.Errorf("need 2 args of <volume server host:port> <volume id>") + volUnmountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volUnmountCommand.Int("volumeId", 0, "the volume id") + nodeStr := volUnmountCommand.String("node", "", "the volume server <host>:<port>") + if err = volUnmountCommand.Parse(args); err != nil { + return nil } - sourceVolumeServer, volumeIdString := args[0], args[1] - volumeId, err := needle.NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) - } + sourceVolumeServer := *nodeStr + + volumeId := needle.VolumeId(*volumeIdInt) return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 4632a1fb0..2d5166acf 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -66,7 +66,7 @@ func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool args[i] = strings.Trim(string(cmds[1+i]), "\"'") } - cmd := strings.ToLower(cmds[0]) + cmd := cmds[0] if cmd == "help" || cmd == "?" { printHelp(cmds) } else if cmd == "exit" || cmd == "quit" { |
