diff options
| author | a <eddy@gfxlabs.io> | 2022-04-20 14:01:42 -0700 |
|---|---|---|
| committer | a <eddy@gfxlabs.io> | 2022-04-20 14:01:42 -0700 |
| commit | 1d6a9e66b608f77a0da9a6903802bb24ff0629d7 (patch) | |
| tree | 7f3e02d6e69d10913d882c5f87d9156001e1b77c /weed/shell | |
| parent | 846858fb436cc061c40c4f2565ed3682e3758596 (diff) | |
| parent | d1fd40358215a6237f51e0918659f74cc7269ff1 (diff) | |
| download | seaweedfs-1d6a9e66b608f77a0da9a6903802bb24ff0629d7.tar.xz seaweedfs-1d6a9e66b608f77a0da9a6903802bb24ff0629d7.zip | |
Merge branch 'master' into a
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_cluster_check.go | 72 | ||||
| -rw-r--r-- | weed/shell/command_ec_balance.go | 15 | ||||
| -rw-r--r-- | weed/shell/command_ec_common.go | 17 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_cat.go | 15 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 15 | ||||
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 6 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 10 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 237 | ||||
| -rw-r--r-- | weed/shell/command_volume_list.go | 70 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 13 | ||||
| -rw-r--r-- | weed/shell/command_volume_vacuum.go | 6 | ||||
| -rw-r--r-- | weed/shell/shell_liner.go | 8 |
12 files changed, 291 insertions, 193 deletions
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index c8fe110ba..616669b6d 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -7,10 +7,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "io" - - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) func init() { @@ -98,15 +97,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, volumeServer := range volumeServers { fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer)) err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { - _, err := client.Ping(context.Background(), &master_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ Target: string(volumeServer), TargetType: cluster.VolumeServerType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -120,15 +120,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i } fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster)) err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { - _, err := client.Ping(context.Background(), &master_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ Target: string(targetMaster), TargetType: cluster.MasterType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -139,15 +140,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, master := range masters { fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master)) err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ Target: string(master), TargetType: cluster.MasterType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -158,15 +160,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, master := range masters { fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master)) err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ Target: string(master), TargetType: cluster.MasterType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -177,15 +180,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, volumeServer := range volumeServers { fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer)) err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ Target: string(volumeServer), TargetType: cluster.VolumeServerType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -199,15 +203,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i } fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer)) err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - _, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ Target: string(targetVolumeServer), TargetType: cluster.VolumeServerType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -218,15 +223,16 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, targetFiler := range filers { fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler)) err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - _, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ Target: string(targetFiler), TargetType: cluster.FilerType, }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } return err }) - if err == nil { - fmt.Fprintf(writer, "ok\n") - } else { + if err != nil { fmt.Fprintf(writer, "%v\n", err) } } @@ -234,3 +240,9 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i return nil } + +func printTiming(writer io.Writer, startNs, remoteNs, stopNs int64) { + roundTripTimeMs := float32(stopNs-startNs) / 1000000 + deltaTimeMs := float32(remoteNs-(startNs+stopNs)/2) / 1000000 + fmt.Fprintf(writer, "ok round trip %.3fms clock delta %.3fms\n", roundTripTimeMs, deltaTimeMs) +} diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 6cd91119b..393d44b80 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -4,12 +4,11 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "io" - "sort" - "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" + "io" ) func init() { @@ -411,8 +410,8 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool hasMove := true for hasMove { hasMove = false - sort.Slice(rackEcNodes, func(i, j int) bool { - return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot + slices.SortFunc(rackEcNodes, func(a, b *EcNode) bool { + return a.freeEcSlot > b.freeEcSlot }) emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1] emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id] @@ -492,8 +491,8 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ }) } } - sort.Slice(candidateEcNodes, func(i, j int) bool { - return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount + slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) bool { + return a.shardCount > b.shardCount }) for i := 0; i < n; i++ { selectedEcNodeIndex := -1 diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b3bd0ce5d..27b650731 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,18 +3,17 @@ package shell import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "math" - "sort" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "google.golang.org/grpc" + "math" ) func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { @@ -116,14 +115,14 @@ func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, } func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot + slices.SortFunc(ecNodes, func(a, b *EcNode) bool { + return a.freeEcSlot > b.freeEcSlot }) } func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot + slices.SortFunc(ecNodes, func(a, b *EcNode) bool { + return a.freeEcSlot < b.freeEcSlot }) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index a7de6d3ef..4616c072d 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -2,11 +2,10 @@ package shell import ( "fmt" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" + "golang.org/x/exp/slices" "io" - "sort" - - "github.com/golang/protobuf/jsonpb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -55,14 +54,12 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W EmitDefaults: true, Indent: " ", } - - sort.Slice(respLookupEntry.Entry.Chunks, func(i, j int) bool { - if respLookupEntry.Entry.Chunks[i].Offset == respLookupEntry.Entry.Chunks[j].Offset { - return respLookupEntry.Entry.Chunks[i].Mtime < respLookupEntry.Entry.Chunks[j].Mtime + slices.SortFunc(respLookupEntry.Entry.Chunks, func(a, b *filer_pb.FileChunk) bool { + if a.Offset == b.Offset { + return a.Mtime < b.Mtime } - return respLookupEntry.Entry.Chunks[i].Offset < respLookupEntry.Entry.Chunks[j].Offset + return a.Offset < b.Offset }) - text, marshalErr := m.MarshalToString(respLookupEntry.Entry) if marshalErr != nil { return fmt.Errorf("marshal meta: %v", marshalErr) diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 7a983de1a..b01d348c5 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -6,9 +6,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "os" - "sort" "time" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -224,14 +224,14 @@ func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool } func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { - sort.Slice(volumes, func(i, j int) bool { - return volumes[i].Size < volumes[j].Size + slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) bool { + return a.Size < b.Size }) } func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { - sort.Slice(volumes, func(i, j int) bool { - return volumes[i].Id < volumes[j].Id + slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) bool { + return a.Id < b.Id }) } @@ -255,10 +255,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu for hasMoved { hasMoved = false - sort.Slice(nodesWithCapacity, func(i, j int) bool { - return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc) + slices.SortFunc(nodesWithCapacity, func(a, b *Node) bool { + return a.localVolumeRatio(capacityFunc) < b.localVolumeRatio(capacityFunc) }) - fullNode := nodesWithCapacity[len(nodesWithCapacity)-1] var candidateVolumes []*master_pb.VolumeInformationMessage for _, v := range fullNode.selectedVolumes { diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 54edd53dd..53284096d 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -9,9 +9,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "golang.org/x/exp/slices" "io" "math" - "sort" ) func init() { @@ -70,8 +70,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write } for _, replicas := range volumeReplicas { - sort.Slice(replicas, func(i, j int) bool { - return fileCount(replicas[i]) > fileCount(replicas[j]) + slices.SortFunc(replicas, func(a, b *VolumeReplica) bool { + return fileCount(a) > fileCount(b) }) for len(replicas) >= 2 { a, b := replicas[0], replicas[1] diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 78285d8a5..c4bef5925 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -7,9 +7,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "path/filepath" - "sort" "strconv" "time" @@ -308,8 +308,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { fn := capacityByFreeVolumeCount(diskType) - sort.Slice(dataNodes, func(i, j int) bool { - return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) + slices.SortFunc(dataNodes, func(a, b location) bool { + return fn(a.dataNode) > fn(b.dataNode) }) } @@ -488,9 +488,7 @@ func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[st } func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica { - - sort.Slice(replicas, func(i, j int) bool { - a, b := replicas[i], replicas[j] + slices.SortFunc(replicas, func(a, b *VolumeReplica) bool { if a.info.Size != b.info.Size { return a.info.Size < b.info.Size } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 7d3aa28a5..1aa33e054 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -5,16 +5,6 @@ import ( "context" "flag" "fmt" - "io" - "io/ioutil" - "math" - "net/http" - "net/url" - "os" - "path/filepath" - "strings" - "sync" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" @@ -25,6 +15,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "io" + "io/ioutil" + "math" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" ) func init() { @@ -65,8 +66,11 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. verbose := fsckCommand.Bool("v", false, "verbose mode") findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"") findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") + findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") + tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") + if err = fsckCommand.Parse(args); err != nil { return nil } @@ -78,7 +82,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. c.env = commandEnv // create a temp folder - tempFolder, err := os.MkdirTemp("", "sw_fsck") + tempFolder, err := os.MkdirTemp(*tempPath, "sw_fsck") if err != nil { return fmt.Errorf("failed to create temp folder: %v", err) } @@ -88,14 +92,14 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. defer os.RemoveAll(tempFolder) // collect all volume id locations - volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer) + dataNodeVolumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer) if err != nil { return fmt.Errorf("failed to collect all volume locations: %v", err) } isBucketsPath := false var fillerBucketsPath string - if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "" { + if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "/" { fillerBucketsPath, err = readFilerBucketsPath(commandEnv) if err != nil { return fmt.Errorf("read filer buckets path: %v", err) @@ -108,34 +112,43 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("read filer buckets path: %v", err) } + collectMtime := time.Now().Unix() // collect each volume file ids - for volumeId, vinfo := range volumeIdToVInfo { - if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) { - delete(volumeIdToVInfo, volumeId) - continue - } - err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer) - if err != nil { - return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { + for volumeId, vinfo := range volumeIdToVInfo { + if *findMissingChunksInVolumeId > 0 && uint32(*findMissingChunksInVolumeId) != volumeId { + delete(volumeIdToVInfo, volumeId) + continue + } + if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) { + delete(volumeIdToVInfo, volumeId) + continue + } + err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer) + if err != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + } } } if *findMissingChunksInFiler { // collect all filer file ids and paths - if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent); err != nil { + if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent, collectMtime); err != nil { return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) } - // for each volume, check filer file ids - if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil { - return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err) + for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { + // for each volume, check filer file ids + if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, dataNodeId, writer, *verbose, *applyPurging); err != nil { + return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err) + } } } else { // collect all filer file ids - if err = c.collectFilerFileIds(volumeIdToVInfo, tempFolder, writer, *verbose); err != nil { + if err = c.collectFilerFileIds(dataNodeVolumeIdToVInfo, tempFolder, writer, *verbose); err != nil { return fmt.Errorf("failed to collect file ids from filer: %v", err) } // volume file ids subtract filer file ids - if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil { + if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil { return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) } } @@ -143,19 +156,24 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return nil } -func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool) error { +func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool, collectMtime int64) error { if verbose { fmt.Fprintf(writer, "checking each file from filer ...\n") } files := make(map[uint32]*os.File) - for vid := range volumeIdToServer { - dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if openErr != nil { - return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + for _, volumeIdToServer := range dataNodeVolumeIdToVInfo { + for vid := range volumeIdToServer { + if _, ok := files[vid]; ok { + continue + } + dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + } + files[vid] = dst } - files[vid] = dst } defer func() { for _, f := range files { @@ -179,6 +197,9 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint } dataChunks = append(dataChunks, manifestChunks...) for _, chunk := range dataChunks { + if chunk.Mtime > collectMtime { + continue + } outputChan <- &Item{ vid: chunk.Fid.VolumeId, fileKey: chunk.Fid.FileKey, @@ -210,10 +231,10 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint } -func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error { +func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, dataNodeId string, writer io.Writer, verbose bool, applyPurging bool) error { for volumeId, vinfo := range volumeIdToVInfo { - checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose, applyPurging) + checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, dataNodeId, volumeId, writer, verbose, applyPurging) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -221,55 +242,93 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf return nil } -func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error { +func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error { var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 + volumeIdOrphanFileIds := make(map[uint32]map[string]bool) + isSeveralReplicas := make(map[uint32]bool) + isEcVolumeReplicas := make(map[uint32]bool) + isReadOnlyReplicas := make(map[uint32]bool) + serverReplicas := make(map[uint32][]pb.ServerAddress) + for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { + for volumeId, vinfo := range volumeIdToVInfo { + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, dataNodeId, volumeId, writer, verbose) + if checkErr != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) + } + isSeveralReplicas[volumeId] = false + if _, found := volumeIdOrphanFileIds[volumeId]; !found { + volumeIdOrphanFileIds[volumeId] = make(map[string]bool) + } else { + isSeveralReplicas[volumeId] = true + } + for _, fid := range orphanFileIds { + if isSeveralReplicas[volumeId] { + if _, found := volumeIdOrphanFileIds[volumeId][fid]; !found { + continue + } + } + volumeIdOrphanFileIds[volumeId][fid] = isSeveralReplicas[volumeId] + } - for volumeId, vinfo := range volumeIdToVInfo { - inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose) - if checkErr != nil { - return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) - } - totalInUseCount += inUseCount - totalOrphanChunkCount += uint64(len(orphanFileIds)) - totalOrphanDataSize += orphanDataSize + totalInUseCount += inUseCount + totalOrphanChunkCount += uint64(len(orphanFileIds)) + totalOrphanDataSize += orphanDataSize - if verbose { - for _, fid := range orphanFileIds { - fmt.Fprintf(writer, "%s\n", fid) + if verbose { + for _, fid := range orphanFileIds { + fmt.Fprintf(writer, "%s\n", fid) + } + } + isEcVolumeReplicas[volumeId] = vinfo.isEcVolume + if isReadOnly, found := isReadOnlyReplicas[volumeId]; !(found && isReadOnly) { + isReadOnlyReplicas[volumeId] = vinfo.isReadOnly } + serverReplicas[volumeId] = append(serverReplicas[volumeId], vinfo.server) } - if applyPurging && len(orphanFileIds) > 0 { + for volumeId, orphanReplicaFileIds := range volumeIdOrphanFileIds { + if !(applyPurging && len(orphanReplicaFileIds) > 0) { + continue + } + orphanFileIds := []string{} + for fid, foundInAllReplicas := range orphanReplicaFileIds { + if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) { + orphanFileIds = append(orphanFileIds, fid) + } + } + if !(len(orphanFileIds) > 0) { + continue + } if verbose { fmt.Fprintf(writer, "purging process for volume %d", volumeId) } - if vinfo.isEcVolume { + if isEcVolumeReplicas[volumeId] { fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId) continue } + for _, server := range serverReplicas[volumeId] { + needleVID := needle.VolumeId(volumeId) - needleVID := needle.VolumeId(volumeId) - - if vinfo.isReadOnly { - err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true) - if err != nil { - return fmt.Errorf("mark volume %d read/write: %v", volumeId, err) - } - - fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server) - defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false) - } + if isReadOnlyReplicas[volumeId] { + err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, true) + if err != nil { + return fmt.Errorf("mark volume %d read/write: %v", volumeId, err) + } - fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server) + fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, server) + defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, server, false) - if verbose { - fmt.Fprintf(writer, "purging files from volume %d\n", volumeId) - } + fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, server) + } + if verbose { + fmt.Fprintf(writer, "purging files from volume %d\n", volumeId) + } - if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { - return fmt.Errorf("purging volume %d: %v", volumeId, err) + if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { + return fmt.Errorf("purging volume %d: %v", volumeId, err) + } } } } @@ -290,7 +349,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u return nil } -func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error { +func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error { if verbose { fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) @@ -316,7 +375,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err) } - err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId)) + err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)) if err != nil { return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err) } @@ -327,19 +386,21 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId } -func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool) error { +func (c *commandVolumeFsck) collectFilerFileIds(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool) error { if verbose { fmt.Fprintf(writer, "collecting file ids from filer ...\n") } files := make(map[uint32]*os.File) - for vid := range volumeIdToServer { - dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if openErr != nil { - return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + for _, volumeIdToServer := range dataNodeVolumeIdToVInfo { + for vid := range volumeIdToServer { + dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + } + files[vid] = dst } - files[vid] = dst } defer func() { for _, f := range files { @@ -377,16 +438,16 @@ func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInf }) } -func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool, applyPurging bool) (err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, dataNodeId string, volumeId uint32, writer io.Writer, verbose bool, applyPurging bool) (err error) { if verbose { - fmt.Fprintf(writer, "find missing file chunks in volume %d ...\n", volumeId) + fmt.Fprintf(writer, "find missing file chunks in dataNodeId %s volume %d ...\n", dataNodeId, volumeId) } db := needle_map.NewMemDb() defer db.Close() - if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil { + if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)); err != nil { return } @@ -473,12 +534,12 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) { } } -func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, dataNodeId string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { db := needle_map.NewMemDb() defer db.Close() - if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil { + if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)); err != nil { return } @@ -509,8 +570,8 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder stri if orphanFileCount > 0 { pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount)) - fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", - volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize) + fmt.Fprintf(writer, "dataNode:%s\tvolume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", + dataNodeId, volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize) } return @@ -524,13 +585,13 @@ type VInfo struct { isReadOnly bool } -func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) { +func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[string]map[uint32]VInfo, err error) { if verbose { fmt.Fprintf(writer, "collecting volume id and locations from master ...\n") } - volumeIdToServer = make(map[uint32]VInfo) + volumeIdToServer = make(map[string]map[uint32]VInfo) // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { @@ -539,8 +600,10 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { for _, diskInfo := range t.DiskInfos { + dataNodeId := t.GetId() + volumeIdToServer[dataNodeId] = make(map[uint32]VInfo) for _, vi := range diskInfo.VolumeInfos { - volumeIdToServer[vi.Id] = VInfo{ + volumeIdToServer[dataNodeId][vi.Id] = VInfo{ server: pb.NewServerAddressFromDataNode(t), collection: vi.Collection, isEcVolume: false, @@ -548,7 +611,7 @@ func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose boo } } for _, ecShardInfo := range diskInfo.EcShardInfos { - volumeIdToServer[ecShardInfo.Id] = VInfo{ + volumeIdToServer[dataNodeId][ecShardInfo.Id] = VInfo{ server: pb.NewServerAddressFromDataNode(t), collection: ecShardInfo.Collection, isEcVolume: true, @@ -600,8 +663,8 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds [] return } -func getVolumeFileIdFile(tempFolder string, vid uint32) string { - return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid)) +func getVolumeFileIdFile(tempFolder string, dataNodeid string, vid uint32) string { + return filepath.Join(tempFolder, fmt.Sprintf("%s_%d.idx", dataNodeid, vid)) } func getFilerFileIdFile(tempFolder string, vid uint32) string { diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index 4c0429ecb..3a5633168 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -6,9 +6,10 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "golang.org/x/exp/slices" + "path/filepath" "io" - "sort" ) func init() { @@ -16,6 +17,9 @@ func init() { } type commandVolumeList struct { + collectionPattern *string + readonly *bool + volumeId *uint64 } func (c *commandVolumeList) Name() string { @@ -34,6 +38,10 @@ func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io. volumeListCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) verbosityLevel := volumeListCommand.Int("v", 5, "verbose mode: 0, 1, 2, 3, 4, 5") + c.collectionPattern = volumeListCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") + c.readonly = volumeListCommand.Bool("readonly", false, "show only readonly") + c.volumeId = volumeListCommand.Uint64("volumeId", 0, "show only volume id") + if err = volumeListCommand.Parse(args); err != nil { return nil } @@ -44,7 +52,7 @@ func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io. return err } - writeTopologyInfo(writer, topologyInfo, volumeSizeLimitMb, *verbosityLevel) + c.writeTopologyInfo(writer, topologyInfo, volumeSizeLimitMb, *verbosityLevel) return nil } @@ -65,66 +73,90 @@ func diskInfoToString(diskInfo *master_pb.DiskInfo) string { return buf.String() } -func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64, verbosityLevel int) statistics { +func (c *commandVolumeList) writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64, verbosityLevel int) statistics { output(verbosityLevel >= 0, writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(t.DiskInfos)) - sort.Slice(t.DataCenterInfos, func(i, j int) bool { - return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id + slices.SortFunc(t.DataCenterInfos, func(a, b *master_pb.DataCenterInfo) bool { + return a.Id < b.Id }) var s statistics for _, dc := range t.DataCenterInfos { - s = s.plus(writeDataCenterInfo(writer, dc, verbosityLevel)) + s = s.plus(c.writeDataCenterInfo(writer, dc, verbosityLevel)) } output(verbosityLevel >= 0, writer, "%+v \n", s) return s } -func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo, verbosityLevel int) statistics { + +func (c *commandVolumeList) writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo, verbosityLevel int) statistics { output(verbosityLevel >= 1, writer, " DataCenter %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics - sort.Slice(t.RackInfos, func(i, j int) bool { - return t.RackInfos[i].Id < t.RackInfos[j].Id + slices.SortFunc(t.RackInfos, func(a, b *master_pb.RackInfo) bool { + return a.Id < b.Id }) for _, r := range t.RackInfos { - s = s.plus(writeRackInfo(writer, r, verbosityLevel)) + s = s.plus(c.writeRackInfo(writer, r, verbosityLevel)) } output(verbosityLevel >= 1, writer, " DataCenter %s %+v \n", t.Id, s) return s } -func writeRackInfo(writer io.Writer, t *master_pb.RackInfo, verbosityLevel int) statistics { + +func (c *commandVolumeList) writeRackInfo(writer io.Writer, t *master_pb.RackInfo, verbosityLevel int) statistics { output(verbosityLevel >= 2, writer, " Rack %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics - sort.Slice(t.DataNodeInfos, func(i, j int) bool { - return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id + slices.SortFunc(t.DataNodeInfos, func(a, b *master_pb.DataNodeInfo) bool { + return a.Id < b.Id }) for _, dn := range t.DataNodeInfos { - s = s.plus(writeDataNodeInfo(writer, dn, verbosityLevel)) + s = s.plus(c.writeDataNodeInfo(writer, dn, verbosityLevel)) } output(verbosityLevel >= 2, writer, " Rack %s %+v \n", t.Id, s) return s } -func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo, verbosityLevel int) statistics { + +func (c *commandVolumeList) writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo, verbosityLevel int) statistics { output(verbosityLevel >= 3, writer, " DataNode %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics for _, diskInfo := range t.DiskInfos { - s = s.plus(writeDiskInfo(writer, diskInfo, verbosityLevel)) + s = s.plus(c.writeDiskInfo(writer, diskInfo, verbosityLevel)) } output(verbosityLevel >= 3, writer, " DataNode %s %+v \n", t.Id, s) return s } -func writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo, verbosityLevel int) statistics { +func (c *commandVolumeList) isNotMatchDiskInfo(readOnly bool, collection string, volumeId uint32) bool { + if *c.readonly && !readOnly { + return true + } + if *c.collectionPattern != "" { + if matched, _ := filepath.Match(*c.collectionPattern, collection); !matched { + return true + } + } + if *c.volumeId > 0 && *c.volumeId != uint64(volumeId) { + return true + } + return false +} + +func (c *commandVolumeList) writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo, verbosityLevel int) statistics { var s statistics diskType := t.Type if diskType == "" { diskType = "hdd" } output(verbosityLevel >= 4, writer, " Disk %s(%s)\n", diskType, diskInfoToString(t)) - sort.Slice(t.VolumeInfos, func(i, j int) bool { - return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id + slices.SortFunc(t.VolumeInfos, func(a, b *master_pb.VolumeInformationMessage) bool { + return a.Id < b.Id }) for _, vi := range t.VolumeInfos { + if c.isNotMatchDiskInfo(vi.ReadOnly, vi.Collection, vi.Id) { + continue + } s = s.plus(writeVolumeInformationMessage(writer, vi, verbosityLevel)) } for _, ecShardInfo := range t.EcShardInfos { + if c.isNotMatchDiskInfo(false, ecShardInfo.Collection, ecShardInfo.Id) { + continue + } output(verbosityLevel >= 5, writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) } output(verbosityLevel >= 4, writer, " Disk %s %+v \n", diskType, s) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 31ebcfec1..f07ea4b79 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -8,9 +8,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "os" - "sort" ) func init() { @@ -153,11 +153,9 @@ func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyI func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) { for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { - - sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id) + slices.SortFunc(otherNodes, func(a, b *EcNode) bool { + return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id) }) - for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] collectionPrefix := "" @@ -188,10 +186,9 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][ return v.DiskType == vol.DiskType }) } - sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localVolumeRatio(fn) > otherNodes[j].localVolumeRatio(fn) + slices.SortFunc(otherNodes, func(a, b *Node) bool { + return a.localVolumeRatio(fn) > b.localVolumeRatio(fn) }) - for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange) diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index 61b1f06fa..fc454c9ff 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -22,7 +22,7 @@ func (c *commandVacuum) Name() string { func (c *commandVacuum) Help() string { return `compact volumes if deleted entries are more than the limit - volume.vacuum [-garbageThreshold=0.3] + volume.vacuum [-garbageThreshold=0.3] [-collection=<collection name>] [-volumeId=<volume id>] ` } @@ -31,6 +31,8 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) garbageThreshold := volumeVacuumCommand.Float64("garbageThreshold", 0.3, "vacuum when garbage is more than this limit") + collection := volumeVacuumCommand.String("collection", "", "vacuum this collection") + volumeId := volumeVacuumCommand.Uint("volumeId", 0, "the volume id") if err = volumeVacuumCommand.Parse(args); err != nil { return } @@ -42,6 +44,8 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ GarbageThreshold: float32(*garbageThreshold), + VolumeId: uint32(*volumeId), + Collection: *collection, }) return err }) diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 90ce2dbb4..94a68f5bc 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -8,12 +8,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util/grace" + "golang.org/x/exp/slices" "io" "math/rand" "os" "path" "regexp" - "sort" "strings" "github.com/peterh/liner" @@ -25,11 +25,9 @@ var ( ) func RunShell(options ShellOptions) { - - sort.Slice(Commands, func(i, j int) bool { - return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0 + slices.SortFunc(Commands, func(a, b command) bool { + return strings.Compare(a.Name(), b.Name()) < 0 }) - line = liner.NewLiner() defer line.Close() grace.OnInterrupt(func() { |
