aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_ec_balance.go5
-rw-r--r--weed/shell/command_ec_common.go26
-rw-r--r--weed/shell/command_ec_decode.go15
-rw-r--r--weed/shell/command_ec_encode.go13
-rw-r--r--weed/shell/command_ec_rebuild.go11
-rw-r--r--weed/shell/command_fs_meta_save.go6
-rw-r--r--weed/shell/command_s3_clean_uploads.go2
-rw-r--r--weed/shell/command_volume_balance.go3
-rw-r--r--weed/shell/command_volume_check_disk.go17
-rw-r--r--weed/shell/command_volume_configure_replication.go3
-rw-r--r--weed/shell/command_volume_copy.go3
-rw-r--r--weed/shell/command_volume_delete.go3
-rw-r--r--weed/shell/command_volume_delete_empty.go3
-rw-r--r--weed/shell/command_volume_fix_replication.go7
-rw-r--r--weed/shell/command_volume_fsck.go11
-rw-r--r--weed/shell/command_volume_mark.go3
-rw-r--r--weed/shell/command_volume_mount.go5
-rw-r--r--weed/shell/command_volume_move.go19
-rw-r--r--weed/shell/command_volume_server_leave.go5
-rw-r--r--weed/shell/command_volume_tier_download.go5
-rw-r--r--weed/shell/command_volume_tier_move.go26
-rw-r--r--weed/shell/command_volume_tier_upload.go5
-rw-r--r--weed/shell/command_volume_unmount.go5
-rw-r--r--weed/shell/commands.go7
24 files changed, 116 insertions, 92 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index b1ca926d5..d15c69d43 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"sort"
@@ -215,10 +216,10 @@ func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle
duplicatedShardIds := []uint32{uint32(shardId)}
for _, ecNode := range ecNodes[1:] {
- if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
- if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index fd35bb14b..51c8c32cd 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -3,6 +3,7 @@ package shell
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"math"
"sort"
@@ -22,20 +23,22 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
if applyBalancing {
+ existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
+
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
- err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
if err != nil {
return err
}
@@ -53,13 +56,14 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
- volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
+ err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- if targetServer.info.Id != existingLocation {
+ if targetAddress != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
@@ -69,7 +73,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcxFile: true,
CopyEcjFile: true,
CopyVifFile: true,
- SourceDataNode: existingLocation,
+ SourceDataNode: string(existingLocation),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -86,7 +90,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
}
- if targetServer.info.Id != existingLocation {
+ if targetAddress != existingLocation {
copiedShardIds = shardIdsToCopy
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
}
@@ -233,7 +237,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
return
}
-func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
@@ -248,7 +252,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin
}
-func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
@@ -261,7 +265,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s
})
}
-func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index e4d597d84..68b7820b2 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -100,7 +101,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
return nil
}
-func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
+func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -132,7 +133,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, ta
return nil
}
-func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
@@ -148,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
}
-func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
+func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
maxShardCount := 0
var exisitngEcIndexBits erasure_coding.ShardBits
@@ -185,7 +186,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasur
CopyEcxFile: false,
CopyEcjFile: true,
CopyVifFile: true,
- SourceDataNode: loc,
+ SourceDataNode: string(loc),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
@@ -243,14 +244,14 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
return
}
-func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
+func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[pb.ServerAddress]erasure_coding.ShardBits {
- nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
+ nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
- nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
+ nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
}
}
}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 014b9bab7..39ca39a4f 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"sync"
"time"
@@ -106,7 +107,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
}
// generate ec shards
- err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].Url)
+ err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress())
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
@@ -120,7 +121,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
return nil
}
-func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
@@ -161,13 +162,13 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection
}
// unmount the to be deleted shards
- err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
if err != nil {
return err
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
if err != nil {
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
@@ -175,7 +176,7 @@ func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
- err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
+ err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress())
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
@@ -194,7 +195,7 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer
copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
defer wg.Done()
copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
- allocatedEcShardIds, volumeId, collection, existingLocation.Url)
+ allocatedEcShardIds, volumeId, collection, existingLocation.ServerAddress())
if copyErr != nil {
err = copyErr
} else {
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 8d5d7bb91..3bdb9b4a0 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -141,7 +142,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
// clean up working files
// ask the rebuilder to delete the copied shards
- err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), copiedShardIds)
if err != nil {
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
}
@@ -153,13 +154,13 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
}
// generate ec shards, and maybe ecx file
- generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
+ generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info))
if err != nil {
return err
}
// mount the generated shards
- err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
+ err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(rebuilder.info), generatedShardIds)
if err != nil {
return err
}
@@ -169,7 +170,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
return nil
}
-func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
+func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
@@ -212,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection
var copyErr error
if applyBalancing {
- copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index d7cc2efef..2cbe83e21 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -63,8 +63,8 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
fileName := *outputFileName
if fileName == "" {
t := time.Now()
- fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
- commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
+ fileName = fmt.Sprintf("%s-%4d%02d%02d-%02d%02d%02d.meta",
+ commandEnv.option.FilerAddress.ToHttpAddress(), t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
}
dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
@@ -105,7 +105,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
})
if err == nil {
- fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName)
+ fmt.Fprintf(writer, "meta data for http://%s%s is saved to %s\n", commandEnv.option.FilerAddress.ToHttpAddress(), path, fileName)
}
return err
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
index 1ba31292c..4f893df7a 100644
--- a/weed/shell/command_s3_clean_uploads.go
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -78,7 +78,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io
}
for _, staleUpload := range staleUploads {
- deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerHost, commandEnv.option.FilerPort, uploadsDir, staleUpload)
+ deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
err = util.Delete(deleteUrl, "")
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index df3067722..8098fabdf 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -325,7 +326,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
- return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType, false)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, os.Stderr, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(fullNode.info), pb.NewServerAddressFromDataNode(emptyNode.info), 5*time.Second, v.DiskType, false)
}
return nil
}
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 7e060f3d3..a7343e258 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"io"
@@ -108,10 +109,10 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *nee
aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb()
// read index db
- if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil {
+ if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), *verbose, writer); err != nil {
return err
}
- if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil {
+ if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), *verbose, writer); err != nil {
return err
}
@@ -155,7 +156,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
for _, needleValue := range missingNeedles {
- needleBlob, err := c.readSourceNeedleBlob(source.location.dataNode.Id, source.info.Id, needleValue)
+ needleBlob, err := c.readSourceNeedleBlob(pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue)
if err != nil {
return hasChanges, err
}
@@ -170,7 +171,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
hasChanges = true
- if err = c.writeNeedleBlobToTarget(target.location.dataNode.Id, source.info.Id, needleValue, needleBlob); err != nil {
+ if err = c.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil {
return hasChanges, err
}
@@ -179,7 +180,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_m
return
}
-func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
+func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
@@ -197,7 +198,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer string,
return
}
-func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer string, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
+func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
@@ -211,7 +212,7 @@ func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer stri
}
-func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer string, verbose bool, writer io.Writer) error {
+func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer) error {
var buf bytes.Buffer
if err := c.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer); err != nil {
@@ -226,7 +227,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
}
-func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer string, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
+func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index e3f034873..ffc2656ea 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -83,7 +84,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 85b4bb095..ca8f5c832 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -44,7 +45,7 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
- sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr
+ sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
index 187caa1a4..4297b669b 100644
--- a/weed/shell/command_volume_delete.go
+++ b/weed/shell/command_volume_delete.go
@@ -2,6 +2,7 @@ package shell
import (
"flag"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -41,7 +42,7 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
return nil
}
- sourceVolumeServer := *nodeStr
+ sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
diff --git a/weed/shell/command_volume_delete_empty.go b/weed/shell/command_volume_delete_empty.go
index 079915f66..a20ac2f7e 100644
--- a/weed/shell/command_volume_delete_empty.go
+++ b/weed/shell/command_volume_delete_empty.go
@@ -2,6 +2,7 @@ package shell
import (
"flag"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
@@ -58,7 +59,7 @@ func (c *commandVolumeDeleteEmpty) Do(args []string, commandEnv *CommandEnv, wri
if v.Size <= 8 && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if *applyBalancing {
log.Printf("deleting empty volume %d from %s", v.Id, dn.Id)
- if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), dn.Id); deleteErr != nil {
+ if deleteErr := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), pb.NewServerAddressFromDataNode(dn)); deleteErr != nil {
err = deleteErr
}
continue
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index efd5ae5de..06e9ac003 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
@@ -147,7 +148,7 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
break
}
- if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil {
return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
}
@@ -200,10 +201,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
break
}
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
- SourceDataNode: replica.location.dataNode.Id,
+ SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 9e778b918..a2c3e9fd6 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -5,6 +5,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
"io/ioutil"
@@ -437,7 +438,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri
}
type VInfo struct {
- server string
+ server pb.ServerAddress
collection string
isEcVolume bool
}
@@ -459,14 +460,14 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo
for _, diskInfo := range t.DiskInfos {
for _, vi := range diskInfo.VolumeInfos {
volumeIdToServer[vi.Id] = VInfo{
- server: t.Id,
+ server: pb.NewServerAddressFromDataNode(t),
collection: vi.Collection,
isEcVolume: false,
}
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
volumeIdToServer[ecShardInfo.Id] = VInfo{
- server: t.Id,
+ server: pb.NewServerAddressFromDataNode(t),
collection: ecShardInfo.Collection,
isEcVolume: true,
}
@@ -491,7 +492,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
var wg sync.WaitGroup
for _, location := range locations {
wg.Add(1)
- go func(server string, fidList []string) {
+ go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
@@ -500,7 +501,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
resultChan <- deleteResults
}
- }(location.Url, fileIds)
+ }(location.ServerAddress(), fileIds)
}
wg.Wait()
close(resultChan)
diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go
index 19b614310..5531f4e3d 100644
--- a/weed/shell/command_volume_mark.go
+++ b/weed/shell/command_volume_mark.go
@@ -3,6 +3,7 @@ package shell
import (
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -47,7 +48,7 @@ func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.
markWritable = true
}
- sourceVolumeServer := *nodeStr
+ sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index bd588d0b5..e46ef3683 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -3,6 +3,7 @@ package shell
import (
"context"
"flag"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -45,7 +46,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
return nil
}
- sourceVolumeServer := *nodeStr
+ sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
@@ -53,7 +54,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
}
-func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 3dcf41354..c404d8e30 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"log"
@@ -62,7 +63,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
- sourceVolumeServer, targetVolumeServer := *sourceNodeStr, *targetNodeStr
+ sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
@@ -74,7 +75,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
-func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
+func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, skipTailError bool) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType)
@@ -100,7 +101,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId n
return nil
}
-func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) {
+func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string) (lastAppendAtNs uint64, err error) {
// check to see if the volume is already read-only and if its not then we need
// to mark it as read-only and then before we return we need to undo what we
@@ -142,7 +143,7 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
- SourceDataNode: sourceVolumeServer,
+ SourceDataNode: string(sourceVolumeServer),
DiskType: diskType,
})
if replicateErr == nil {
@@ -154,21 +155,21 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
return
}
-func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
+func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
- SourceVolumeServer: sourceVolumeServer,
+ SourceVolumeServer: string(sourceVolumeServer),
})
return replicateErr
})
}
-func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
@@ -177,7 +178,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
})
}
-func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) {
+func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
@@ -195,7 +196,7 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable bool) error {
for _, location := range locations {
fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
- if err := markVolumeWritable(grpcDialOption, volumeId, location.Url, writable); err != nil {
+ if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable); err != nil {
return err
}
}
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
index 2a2e56e86..7f7b7148b 100644
--- a/weed/shell/command_volume_server_leave.go
+++ b/weed/shell/command_volume_server_leave.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
"io"
@@ -50,11 +51,11 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri
return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
}
- return volumeServerLeave(commandEnv.option.GrpcDialOption, *volumeServer, writer)
+ return volumeServerLeave(commandEnv.option.GrpcDialOption, pb.ServerAddress(*volumeServer), writer)
}
-func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer string, writer io.Writer) (err error) {
+func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) {
return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
if leaveErr != nil {
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index 33166ce65..bfb78608f 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"google.golang.org/grpc"
@@ -112,7 +113,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
// TODO parallelize this
for _, loc := range locations {
// copy the .dat file from remote tier to local
- err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
+ err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.ServerAddress())
if err != nil {
return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err)
}
@@ -121,7 +122,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
return nil
}
-func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
+func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error {
err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index c72958259..33b51ddf2 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/wdclient"
@@ -20,7 +21,7 @@ func init() {
}
type commandVolumeTierMove struct {
- activeServers map[string]struct{}
+ activeServers map[pb.ServerAddress]struct{}
activeServersLock sync.Mutex
activeServersCond *sync.Cond
}
@@ -42,7 +43,7 @@ func (c *commandVolumeTierMove) Help() string {
func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- c.activeServers = make(map[string]struct{})
+ c.activeServers = make(map[pb.ServerAddress]struct{})
c.activeServersCond = sync.NewCond(new(sync.Mutex))
if err = commandEnv.confirmIsLocked(); err != nil {
@@ -117,10 +118,10 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
if isOneOf(dst.dataNode.Id, locations) {
continue
}
- sourceVolumeServer := ""
+ var sourceVolumeServer pb.ServerAddress
for _, loc := range locations {
if loc.Url != dst.dataNode.Id {
- sourceVolumeServer = loc.Url
+ sourceVolumeServer = loc.ServerAddress()
}
}
if sourceVolumeServer == "" {
@@ -135,16 +136,17 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
break
}
+ destServerAddress := pb.NewServerAddressFromDataNode(dst.dataNode)
c.activeServersCond.L.Lock()
_, isSourceActive := c.activeServers[sourceVolumeServer]
- _, isDestActive := c.activeServers[dst.dataNode.Id]
+ _, isDestActive := c.activeServers[destServerAddress]
for isSourceActive || isDestActive {
c.activeServersCond.Wait()
_, isSourceActive = c.activeServers[sourceVolumeServer]
- _, isDestActive = c.activeServers[dst.dataNode.Id]
+ _, isDestActive = c.activeServers[destServerAddress]
}
c.activeServers[sourceVolumeServer] = struct{}{}
- c.activeServers[dst.dataNode.Id] = struct{}{}
+ c.activeServers[destServerAddress] = struct{}{}
c.activeServersCond.L.Unlock()
wg.Add(1)
@@ -153,7 +155,7 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
fmt.Fprintf(writer, "move volume %d %s => %s: %v\n", vid, sourceVolumeServer, dst.dataNode.Id, err)
}
delete(c.activeServers, sourceVolumeServer)
- delete(c.activeServers, dst.dataNode.Id)
+ delete(c.activeServers, destServerAddress)
c.activeServersCond.Signal()
wg.Done()
}(dst)
@@ -170,13 +172,13 @@ func (c *commandVolumeTierMove) doVolumeTierMove(commandEnv *CommandEnv, writer
return nil
}
-func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer string, dst location) (err error) {
+func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer io.Writer, vid needle.VolumeId, toDiskType types.DiskType, locations []wdclient.Location, sourceVolumeServer pb.ServerAddress, dst location) (err error) {
// mark all replicas as read only
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
- if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString(), true); err != nil {
+ if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, vid, sourceVolumeServer, pb.NewServerAddressFromDataNode(dst.dataNode), 5*time.Second, toDiskType.ReadableString(), true); err != nil {
// mark all replicas as writable
if err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, true); err != nil {
@@ -191,8 +193,8 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
// remove the remaining replicas
for _, loc := range locations {
- if loc.Url != dst.dataNode.Id && loc.Url != sourceVolumeServer {
- if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
+ if loc.Url != dst.dataNode.Id && loc.ServerAddress() != sourceVolumeServer {
+ if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.ServerAddress()); err != nil {
fmt.Fprintf(writer, "failed to delete volume %d on %s: %v\n", vid, loc.Url, err)
}
}
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index c8540a7dc..d50178cb9 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"time"
@@ -107,7 +108,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
}
// copy the .dat file to remote tier
- err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
+ err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].ServerAddress(), dest, keepLocalDatFile)
if err != nil {
return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err)
}
@@ -115,7 +116,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return nil
}
-func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
+func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index f7e5a501b..5fc158b76 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -3,6 +3,7 @@ package shell
import (
"context"
"flag"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -45,7 +46,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
- sourceVolumeServer := *nodeStr
+ sourceVolumeServer := pb.ServerAddress(*nodeStr)
volumeId := needle.VolumeId(*volumeIdInt)
@@ -53,7 +54,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
}
-func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index aef71b419..18f357ac7 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -22,7 +22,7 @@ type ShellOptions struct {
// shell transient context
FilerHost string
FilerPort int64
- FilerAddress string
+ FilerAddress pb.ServerAddress
Directory string
}
@@ -46,7 +46,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()),
option: options,
}
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
@@ -98,8 +98,7 @@ var _ = filer_pb.FilerClient(&CommandEnv{})
func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- filerGrpcAddress := util.JoinHostPort(ce.option.FilerHost, int(ce.option.FilerPort+10000))
- return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn)
+ return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
}