diff options
Diffstat (limited to 'weed/shell')
24 files changed, 116 insertions, 92 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index b1ca926d5..d15c69d43 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "sort" @@ -215,10 +216,10 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle duplicatedShardIds := []uint32{uint32(shardId)} for _, ecNode := range ecNodes[1:] { - if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } - if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil { return err } ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index fd35bb14b..51c8c32cd 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "math" "sort" @@ -22,20 +23,22 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, if applyBalancing { + existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info) + // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds) if err != nil { return err } // ask source node to delete the shard, and maybe the ecx file - err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds) if err != nil { return err } @@ -53,13 +56,14 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer *EcNode, shardIdsToCopy []uint32, - volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + targetAddress := pb.NewServerAddressFromDataNode(targetServer.info) + err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - if targetServer.info.Id != existingLocation { + if targetAddress != existingLocation { fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ @@ -69,7 +73,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, CopyEcxFile: true, CopyEcjFile: true, CopyVifFile: true, - SourceDataNode: existingLocation, + SourceDataNode: string(existingLocation), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr) @@ -86,7 +90,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr) } - if targetServer.info.Id != existingLocation { + if targetAddress != existingLocation { copiedShardIds = shardIdsToCopy glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) } @@ -233,7 +237,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter return } -func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { +func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error { fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) @@ -248,7 +252,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin } -func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { +func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error { fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) @@ -261,7 +265,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s }) } -func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { +func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error { fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index e4d597d84..68b7820b2 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" @@ -100,7 +101,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return nil } -func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { @@ -132,7 +133,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, ta return nil } -func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) @@ -148,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c } -func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) { maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits @@ -185,7 +186,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur CopyEcxFile: false, CopyEcjFile: true, CopyVifFile: true, - SourceDataNode: loc, + SourceDataNode: string(loc), }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr) @@ -243,14 +244,14 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri return } -func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits { +func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits { - nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits) + nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { for _, v := range diskInfo.EcShardInfos { if v.Id == uint32(vid) { - nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits) } } } diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 014b9bab7..39ca39a4f 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "sync" "time" @@ -106,7 +107,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, } // generate ec shards - err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].Url) + err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress()) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } @@ -120,7 +121,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, return nil } -func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) @@ -161,13 +162,13 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection } // unmount the to be deleted shards - err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].ServerAddress(), copiedShardIds) if err != nil { return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].ServerAddress(), copiedShardIds) if err != nil { return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } @@ -175,7 +176,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection // ask the source volume server to delete the original volume for _, location := range existingLocations { fmt.Printf("delete volume %d from %s\n", volumeId, location.Url) - err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) + err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress()) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) } @@ -194,7 +195,7 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) { defer wg.Done() copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, - allocatedEcShardIds, volumeId, collection, existingLocation.Url) + allocatedEcShardIds, volumeId, collection, existingLocation.ServerAddress()) if copyErr != nil { err = copyErr } else { diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 8d5d7bb91..3bdb9b4a0 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/operation" @@ -141,7 +142,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st // clean up working files // ask the rebuilder to delete the copied shards - err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), copiedShardIds) if err != nil { fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) } @@ -153,13 +154,13 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st } // generate ec shards, and maybe ecx file - generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) + generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info)) if err != nil { return err } // mount the generated shards - err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), generatedShardIds) if err != nil { return err } @@ -169,7 +170,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st return nil } -func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { +func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ @@ -212,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index d7cc2efef..2cbe83e21 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -63,8 +63,8 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. fileName := *outputFileName if fileName == "" { t := time.Now() - fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + fileName = fmt.Sprintf("%s-%4d%02d%02d-%02d%02d%02d.meta", + commandEnv.option.FilerAddress.ToHttpAddress(), t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) } dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) @@ -105,7 +105,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. }) if err == nil { - fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName) + fmt.Fprintf(writer, "meta data for http://%s%s is saved to %s\n", commandEnv.option.FilerAddress.ToHttpAddress(), path, fileName) } return err diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index 1ba31292c..4f893df7a 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -78,7 +78,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io } for _, staleUpload := range staleUploads { - deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerHost, commandEnv.option.FilerPort, uploadsDir, staleUpload) + deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload) fmt.Fprintf(writer, "purge %s\n", deleteUrl) err = util.Delete(deleteUrl, "") diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index df3067722..8098fabdf 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" @@ -325,7 +326,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f } fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { - return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType, false) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, false) } return nil } diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 7e060f3d3..a7343e258 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "io" @@ -108,10 +109,10 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *nee aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb() // read index db - if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil { + if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), *verbose, writer); err != nil { return err } - if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil { + if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), *verbose, writer); err != nil { return err } @@ -155,7 +156,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m for _, needleValue := range missingNeedles { - needleBlob, err := c.readSourceNeedleBlob(source.location.dataNode.Id, source.info.Id, needleValue) + needleBlob, err := c.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) if err != nil { return hasChanges, err } @@ -170,7 +171,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m hasChanges = true - if err = c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil { + if err = c.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { return hasChanges, err } @@ -179,7 +180,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m return } -func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { +func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ @@ -197,7 +198,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string, return } -func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { +func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ @@ -211,7 +212,7 @@ func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer stri } -func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer string, verbose bool, writer io.Writer) error { +func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer) error { var buf bytes.Buffer if err := c.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer); err != nil { @@ -226,7 +227,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect } -func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer string, buf *bytes.Buffer, verbose bool, writer io.Writer) error { +func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index e3f034873..ffc2656ea 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/operation" @@ -83,7 +84,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } for _, dst := range allLocations { - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ VolumeId: uint32(vid), Replication: replicaPlacement.String(), diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index 85b4bb095..ca8f5c832 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -44,7 +45,7 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr + sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr) volumeId := needle.VolumeId(*volumeIdInt) diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go index 187caa1a4..4297b669b 100644 --- a/weed/shell/command_volume_delete.go +++ b/weed/shell/command_volume_delete.go @@ -2,6 +2,7 @@ package shell import ( "flag" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -41,7 +42,7 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i return nil } - sourceVolumeServer := *nodeStr + sourceVolumeServer := pb.ServerAddress(*nodeStr) volumeId := needle.VolumeId(*volumeIdInt) diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go index 079915f66..a20ac2f7e 100644 --- a/weed/shell/command_volume_delete_empty.go +++ b/weed/shell/command_volume_delete_empty.go @@ -2,6 +2,7 @@ package shell import ( "flag" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" @@ -58,7 +59,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri if v.Size <= 8 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { if *applyBalancing { log.Printf("deleting empty volume %d from %s", v.Id, dn.Id) - if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), dn.Id); deleteErr != nil { + if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(dn)); deleteErr != nil { err = deleteErr } continue diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index efd5ae5de..06e9ac003 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" @@ -147,7 +148,7 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma break } - if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil { + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil { return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) } @@ -200,10 +201,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co break } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, + SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), }) if replicateErr != nil { return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 9e778b918..a2c3e9fd6 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -5,6 +5,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" "io/ioutil" @@ -437,7 +438,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri } type VInfo struct { - server string + server pb.ServerAddress collection string isEcVolume bool } @@ -459,14 +460,14 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo for _, diskInfo := range t.DiskInfos { for _, vi := range diskInfo.VolumeInfos { volumeIdToServer[vi.Id] = VInfo{ - server: t.Id, + server: pb.NewServerAddressFromDataNode(t), collection: vi.Collection, isEcVolume: false, } } for _, ecShardInfo := range diskInfo.EcShardInfos { volumeIdToServer[ecShardInfo.Id] = VInfo{ - server: t.Id, + server: pb.NewServerAddressFromDataNode(t), collection: ecShardInfo.Collection, isEcVolume: true, } @@ -491,7 +492,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds [] var wg sync.WaitGroup for _, location := range locations { wg.Add(1) - go func(server string, fidList []string) { + go func(server pb.ServerAddress, fidList []string) { defer wg.Done() if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil { @@ -500,7 +501,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds [] resultChan <- deleteResults } - }(location.Url, fileIds) + }(location.ServerAddress(), fileIds) } wg.Wait() close(resultChan) diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go index 19b614310..5531f4e3d 100644 --- a/weed/shell/command_volume_mark.go +++ b/weed/shell/command_volume_mark.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -47,7 +48,7 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io. markWritable = true } - sourceVolumeServer := *nodeStr + sourceVolumeServer := pb.ServerAddress(*nodeStr) volumeId := needle.VolumeId(*volumeIdInt) diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index bd588d0b5..e46ef3683 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -3,6 +3,7 @@ package shell import ( "context" "flag" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/operation" @@ -45,7 +46,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io return nil } - sourceVolumeServer := *nodeStr + sourceVolumeServer := pb.ServerAddress(*nodeStr) volumeId := needle.VolumeId(*volumeIdInt) @@ -53,7 +54,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io } -func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { +func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 3dcf41354..c404d8e30 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "log" @@ -62,7 +63,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr + sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr) volumeId := needle.VolumeId(*volumeIdInt) @@ -74,7 +75,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. } // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. -func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) { +func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) { log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType) @@ -100,7 +101,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n return nil } -func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) { +func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) { // check to see if the volume is already read-only and if its not then we need // to mark it as read-only and then before we return we need to undo what we @@ -142,7 +143,7 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), - SourceDataNode: sourceVolumeServer, + SourceDataNode: string(sourceVolumeServer), DiskType: diskType, }) if replicateErr == nil { @@ -154,21 +155,21 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source return } -func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { +func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, IdleTimeoutSeconds: uint32(idleTimeout.Seconds()), - SourceVolumeServer: sourceVolumeServer, + SourceVolumeServer: string(sourceVolumeServer), }) return replicateErr }) } -func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { +func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), @@ -177,7 +178,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour }) } -func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) { +func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) { return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if writable { _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ @@ -195,7 +196,7 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error { for _, location := range locations { fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) - if err := markVolumeWritable(grpcDialOption, volumeId, location.Url, writable); err != nil { + if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil { return err } } diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go index 2a2e56e86..7f7b7148b 100644 --- a/weed/shell/command_volume_server_leave.go +++ b/weed/shell/command_volume_server_leave.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "google.golang.org/grpc" "io" @@ -50,11 +51,11 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri return fmt.Errorf("need to specify volume server by -node=<host>:<port>") } - return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer) + return volumeServerLeave(commandEnv.option.GrpcDialOption, pb.ServerAddress(*volumeServer), writer) } -func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) { +func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) { return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{}) if leaveErr != nil { diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 33166ce65..bfb78608f 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "google.golang.org/grpc" @@ -112,7 +113,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s // TODO parallelize this for _, loc := range locations { // copy the .dat file from remote tier to local - err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url) + err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.ServerAddress()) if err != nil { return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err) } @@ -121,7 +122,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s return nil } -func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { +func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error { err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index c72958259..33b51ddf2 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/wdclient" @@ -20,7 +21,7 @@ func init() { } type commandVolumeTierMove struct { - activeServers map[string]struct{} + activeServers map[pb.ServerAddress]struct{} activeServersLock sync.Mutex activeServersCond *sync.Cond } @@ -42,7 +43,7 @@ func (c *commandVolumeTierMove) Help() string { func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - c.activeServers = make(map[string]struct{}) + c.activeServers = make(map[pb.ServerAddress]struct{}) c.activeServersCond = sync.NewCond(new(sync.Mutex)) if err = commandEnv.confirmIsLocked(); err != nil { @@ -117,10 +118,10 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer if isOneOf(dst.dataNode.Id, locations) { continue } - sourceVolumeServer := "" + var sourceVolumeServer pb.ServerAddress for _, loc := range locations { if loc.Url != dst.dataNode.Id { - sourceVolumeServer = loc.Url + sourceVolumeServer = loc.ServerAddress() } } if sourceVolumeServer == "" { @@ -135,16 +136,17 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer break } + destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode) c.activeServersCond.L.Lock() _, isSourceActive := c.activeServers[sourceVolumeServer] - _, isDestActive := c.activeServers[dst.dataNode.Id] + _, isDestActive := c.activeServers[destServerAddress] for isSourceActive || isDestActive { c.activeServersCond.Wait() _, isSourceActive = c.activeServers[sourceVolumeServer] - _, isDestActive = c.activeServers[dst.dataNode.Id] + _, isDestActive = c.activeServers[destServerAddress] } c.activeServers[sourceVolumeServer] = struct{}{} - c.activeServers[dst.dataNode.Id] = struct{}{} + c.activeServers[destServerAddress] = struct{}{} c.activeServersCond.L.Unlock() wg.Add(1) @@ -153,7 +155,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err) } delete(c.activeServers, sourceVolumeServer) - delete(c.activeServers, dst.dataNode.Id) + delete(c.activeServers, destServerAddress) c.activeServersCond.Signal() wg.Done() }(dst) @@ -170,13 +172,13 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer return nil } -func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) { +func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) { // mark all replicas as read only if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } - if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil { + if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil { // mark all replicas as writable if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil { @@ -191,8 +193,8 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i // remove the remaining replicas for _, loc := range locations { - if loc.Url != dst.dataNode.Id && loc.Url != sourceVolumeServer { - if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil { + if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer { + if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil { fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err) } } diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index c8540a7dc..d50178cb9 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "time" @@ -107,7 +108,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str } // copy the .dat file to remote tier - err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile) + err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].ServerAddress(), dest, keepLocalDatFile) if err != nil { return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err) } @@ -115,7 +116,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str return nil } -func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { +func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index f7e5a501b..5fc158b76 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -3,6 +3,7 @@ package shell import ( "context" "flag" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "github.com/chrislusf/seaweedfs/weed/operation" @@ -45,7 +46,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer return nil } - sourceVolumeServer := *nodeStr + sourceVolumeServer := pb.ServerAddress(*nodeStr) volumeId := needle.VolumeId(*volumeIdInt) @@ -53,7 +54,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer } -func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { +func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), diff --git a/weed/shell/commands.go b/weed/shell/commands.go index aef71b419..18f357ac7 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -22,7 +22,7 @@ type ShellOptions struct { // shell transient context FilerHost string FilerPort int64 - FilerAddress string + FilerAddress pb.ServerAddress Directory string } @@ -46,7 +46,7 @@ var ( func NewCommandEnv(options ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()), option: options, } ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) @@ -98,8 +98,7 @@ var _ = filer_pb.FilerClient(&CommandEnv{}) func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - filerGrpcAddress := util.JoinHostPort(ce.option.FilerHost, int(ce.option.FilerPort+10000)) - return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn) + return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn) } |
