diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_cluster_check.go | 248 | ||||
| -rw-r--r-- | weed/shell/command_cluster_raft_add.go | 59 | ||||
| -rw-r--r-- | weed/shell/command_cluster_raft_ps.go | 51 | ||||
| -rw-r--r-- | weed/shell/command_cluster_raft_remove.go | 56 | ||||
| -rw-r--r-- | weed/shell/command_ec_balance.go | 15 | ||||
| -rw-r--r-- | weed/shell/command_ec_common.go | 17 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_cat.go | 15 | ||||
| -rw-r--r-- | weed/shell/command_mount_configure.go | 64 | ||||
| -rw-r--r-- | weed/shell/command_s3_clean_uploads.go | 18 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 15 | ||||
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 6 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 10 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 20 | ||||
| -rw-r--r-- | weed/shell/command_volume_list.go | 18 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 13 | ||||
| -rw-r--r-- | weed/shell/command_volume_vacuum.go | 6 | ||||
| -rw-r--r-- | weed/shell/commands.go | 2 | ||||
| -rw-r--r-- | weed/shell/shell_liner.go | 8 |
19 files changed, 571 insertions, 72 deletions
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go new file mode 100644 index 000000000..616669b6d --- /dev/null +++ b/weed/shell/command_cluster_check.go @@ -0,0 +1,248 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandClusterCheck{}) +} + +type commandClusterCheck struct { +} + +func (c *commandClusterCheck) Name() string { + return "cluster.check" +} + +func (c *commandClusterCheck) Help() string { + return `check current cluster network connectivity + + cluster.check + +` +} + +func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + if err = clusterPsCommand.Parse(args); err != nil { + return nil + } + + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return err + } + fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos)) + + emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""] + hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"] + if !emptyDiskTypeFound && !hddDiskTypeFound { + return fmt.Errorf("Need to a hdd disk type!") + } + if emptyDiskTypeFound && emptyDiskTypeDiskInfo.VolumeCount == 0 || hddDiskTypeFound && hddDiskTypeDiskInfo.VolumeCount == 0 { + return fmt.Errorf("Need to a hdd disk type!") + } + + // collect filers + var filers []pb.ServerAddress + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + + for _, node := range resp.ClusterNodes { + filers = append(filers, pb.ServerAddress(node.Address)) + } + return err + }) + if err != nil { + return + } + fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers) + + // collect volume servers + var volumeServers []pb.ServerAddress + t, _, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return err + } + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn)) + } + } + } + fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers) + + // collect all masters + var masters []pb.ServerAddress + for _, master := range commandEnv.MasterClient.GetMasters() { + masters = append(masters, master) + } + + // check from master to volume servers + for _, master := range masters { + for _, volumeServer := range volumeServers { + fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer)) + err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ + Target: string(volumeServer), + TargetType: cluster.VolumeServerType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check between masters + for _, sourceMaster := range masters { + for _, targetMaster := range masters { + if sourceMaster == targetMaster { + continue + } + fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster)) + err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ + Target: string(targetMaster), + TargetType: cluster.MasterType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check from volume servers to masters + for _, volumeServer := range volumeServers { + for _, master := range masters { + fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master)) + err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ + Target: string(master), + TargetType: cluster.MasterType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check from filers to masters + for _, filer := range filers { + for _, master := range masters { + fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master)) + err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + Target: string(master), + TargetType: cluster.MasterType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check from filers to volume servers + for _, filer := range filers { + for _, volumeServer := range volumeServers { + fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer)) + err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + Target: string(volumeServer), + TargetType: cluster.VolumeServerType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check between volume servers + for _, sourceVolumeServer := range volumeServers { + for _, targetVolumeServer := range volumeServers { + if sourceVolumeServer == targetVolumeServer { + continue + } + fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer)) + err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ + Target: string(targetVolumeServer), + TargetType: cluster.VolumeServerType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check between filers, and need to connect to itself + for _, sourceFiler := range filers { + for _, targetFiler := range filers { + fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler)) + err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + Target: string(targetFiler), + TargetType: cluster.FilerType, + }) + if err == nil { + printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs) + } + return err + }) + if err != nil { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + return nil +} + +func printTiming(writer io.Writer, startNs, remoteNs, stopNs int64) { + roundTripTimeMs := float32(stopNs-startNs) / 1000000 + deltaTimeMs := float32(remoteNs-(startNs+stopNs)/2) / 1000000 + fmt.Fprintf(writer, "ok round trip %.3fms clock delta %.3fms\n", roundTripTimeMs, deltaTimeMs) +} diff --git a/weed/shell/command_cluster_raft_add.go b/weed/shell/command_cluster_raft_add.go new file mode 100644 index 000000000..e5f3c41c9 --- /dev/null +++ b/weed/shell/command_cluster_raft_add.go @@ -0,0 +1,59 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandRaftServerAdd{}) +} + +type commandRaftServerAdd struct { +} + +func (c *commandRaftServerAdd) Name() string { + return "cluster.raft.add" +} + +func (c *commandRaftServerAdd) Help() string { + return `add a server to the raft cluster + + Example: + cluster.raft.add -id <server_name> -address <server_host:port> -voter +` +} + +func (c *commandRaftServerAdd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + serverId := raftServerAddCommand.String("id", "", "server id") + serverAddress := raftServerAddCommand.String("address", "", "server grpc address") + serverVoter := raftServerAddCommand.Bool("voter", true, "assign it a vote") + if err = raftServerAddCommand.Parse(args); err != nil { + return nil + } + + if *serverId == "" || *serverAddress == "" { + return fmt.Errorf("empty server id or address") + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftAddServer(context.Background(), &master_pb.RaftAddServerRequest{ + Id: *serverId, + Address: *serverAddress, + Voter: *serverVoter, + }) + if err != nil { + return fmt.Errorf("raft add server: %v", err) + } + println("added server", *serverId) + return nil + }) + + return err + +} diff --git a/weed/shell/command_cluster_raft_ps.go b/weed/shell/command_cluster_raft_ps.go new file mode 100644 index 000000000..ea868db06 --- /dev/null +++ b/weed/shell/command_cluster_raft_ps.go @@ -0,0 +1,51 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandRaftClusterPs{}) +} + +type commandRaftClusterPs struct { +} + +func (c *commandRaftClusterPs) Name() string { + return "cluster.raft.ps" +} + +func (c *commandRaftClusterPs) Help() string { + return `check current raft cluster status + + cluster.raft.ps +` +} + +func (c *commandRaftClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + raftClusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + if err = raftClusterPsCommand.Parse(args); err != nil { + return nil + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{}) + if err != nil { + return fmt.Errorf("raft list cluster: %v", err) + } + fmt.Fprintf(writer, "the raft cluster has %d servers\n", len(resp.ClusterServers)) + for _, server := range resp.ClusterServers { + fmt.Fprintf(writer, " * %s %s (%s)\n", server.Id, server.Address, server.Suffrage) + } + + return nil + }) + + return err + +} diff --git a/weed/shell/command_cluster_raft_remove.go b/weed/shell/command_cluster_raft_remove.go new file mode 100644 index 000000000..532a1469c --- /dev/null +++ b/weed/shell/command_cluster_raft_remove.go @@ -0,0 +1,56 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandRaftServerRemove{}) +} + +type commandRaftServerRemove struct { +} + +func (c *commandRaftServerRemove) Name() string { + return "cluster.raft.remove" +} + +func (c *commandRaftServerRemove) Help() string { + return `remove a server from the raft cluster + + Example: + cluster.raft.remove -id <server_name> +` +} + +func (c *commandRaftServerRemove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + raftServerAddCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + serverId := raftServerAddCommand.String("id", "", "server id") + if err = raftServerAddCommand.Parse(args); err != nil { + return nil + } + + if *serverId == "" { + return fmt.Errorf("empty server id") + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: *serverId, + Force: true, + }) + if err != nil { + return fmt.Errorf("raft remove server: %v", err) + } + println("removed server", *serverId) + return nil + }) + + return err + +} diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 6cd91119b..393d44b80 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -4,12 +4,11 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "io" - "sort" - "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" + "io" ) func init() { @@ -411,8 +410,8 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool hasMove := true for hasMove { hasMove = false - sort.Slice(rackEcNodes, func(i, j int) bool { - return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot + slices.SortFunc(rackEcNodes, func(a, b *EcNode) bool { + return a.freeEcSlot > b.freeEcSlot }) emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1] emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id] @@ -492,8 +491,8 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ }) } } - sort.Slice(candidateEcNodes, func(i, j int) bool { - return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount + slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) bool { + return a.shardCount > b.shardCount }) for i := 0; i < n; i++ { selectedEcNodeIndex := -1 diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index b3bd0ce5d..27b650731 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,18 +3,17 @@ package shell import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "math" - "sort" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "google.golang.org/grpc" + "math" ) func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { @@ -116,14 +115,14 @@ func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, } func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot + slices.SortFunc(ecNodes, func(a, b *EcNode) bool { + return a.freeEcSlot > b.freeEcSlot }) } func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) { - sort.Slice(ecNodes, func(i, j int) bool { - return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot + slices.SortFunc(ecNodes, func(a, b *EcNode) bool { + return a.freeEcSlot < b.freeEcSlot }) } diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 9ae9b049c..251448908 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -95,7 +95,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) - if !found { + if !found && len(locations) > 0 { return fmt.Errorf("volume %d not found", vid) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index a7de6d3ef..4616c072d 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -2,11 +2,10 @@ package shell import ( "fmt" + "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" + "golang.org/x/exp/slices" "io" - "sort" - - "github.com/golang/protobuf/jsonpb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -55,14 +54,12 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W EmitDefaults: true, Indent: " ", } - - sort.Slice(respLookupEntry.Entry.Chunks, func(i, j int) bool { - if respLookupEntry.Entry.Chunks[i].Offset == respLookupEntry.Entry.Chunks[j].Offset { - return respLookupEntry.Entry.Chunks[i].Mtime < respLookupEntry.Entry.Chunks[j].Mtime + slices.SortFunc(respLookupEntry.Entry.Chunks, func(a, b *filer_pb.FileChunk) bool { + if a.Offset == b.Offset { + return a.Mtime < b.Mtime } - return respLookupEntry.Entry.Chunks[i].Offset < respLookupEntry.Entry.Chunks[j].Offset + return a.Offset < b.Offset }) - text, marshalErr := m.MarshalToString(respLookupEntry.Entry) if marshalErr != nil { return fmt.Errorf("marshal meta: %v", marshalErr) diff --git a/weed/shell/command_mount_configure.go b/weed/shell/command_mount_configure.go new file mode 100644 index 000000000..8c268d35c --- /dev/null +++ b/weed/shell/command_mount_configure.go @@ -0,0 +1,64 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + _ "google.golang.org/grpc/resolver/passthrough" + "io" +) + +func init() { + Commands = append(Commands, &commandMountConfigure{}) +} + +type commandMountConfigure struct { +} + +func (c *commandMountConfigure) Name() string { + return "mount.configure" +} + +func (c *commandMountConfigure) Help() string { + return `configure the mount on current server + + mount.configure -dir=<mount_directory> + + This command connects with local mount via unix socket, so it can only run locally. + The "mount_directory" value needs to be exactly the same as how mount was started in "weed mount -dir=<mount_directory>" + +` +} + +func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + mountDir := mountConfigureCommand.String("dir", "", "the mount directory same as how \"weed mount -dir=<mount_directory>\" was started") + mountQuota := mountConfigureCommand.Int("quotaMB", 0, "the quota in MB") + if err = mountConfigureCommand.Parse(args); err != nil { + return nil + } + + mountDirHash := util.HashToInt32([]byte(*mountDir)) + if mountDirHash < 0 { + mountDirHash = -mountDirHash + } + localSocket := fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash) + + clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return + } + defer clientConn.Close() + + client := mount_pb.NewSeaweedMountClient(clientConn) + _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ + CollectionCapacity: int64(*mountQuota) * 1024 * 1024, + }) + + return +} diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index 4f893df7a..a6dc8f574 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "io" "math" @@ -39,6 +40,8 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer return nil } + signingKey := util.GetViper().GetString("jwt.signing.key") + var filerBucketsPath string filerBucketsPath, err = readFilerBucketsPath(commandEnv) if err != nil { @@ -55,14 +58,16 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer } for _, bucket := range buckets { - c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo) + if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil { + fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for backet %s: %v", bucket, err)) + } } return err } -func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error { +func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error { uploadsDir := filerBucketsPath + "/" + bucket + "/.uploads" var staleUploads []string now := time.Now() @@ -77,12 +82,17 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io return fmt.Errorf("list uploads under %v: %v", uploadsDir, err) } + var encodedJwt security.EncodedJwt + if signingKey != "" { + encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60) + } + for _, staleUpload := range staleUploads { deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload) fmt.Fprintf(writer, "purge %s\n", deleteUrl) - err = util.Delete(deleteUrl, "") - if err != nil { + err = util.Delete(deleteUrl, string(encodedJwt)) + if err != nil && err.Error() != "" { return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err) } } diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 7a983de1a..b01d348c5 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -6,9 +6,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "os" - "sort" "time" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -224,14 +224,14 @@ func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool } func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { - sort.Slice(volumes, func(i, j int) bool { - return volumes[i].Size < volumes[j].Size + slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) bool { + return a.Size < b.Size }) } func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { - sort.Slice(volumes, func(i, j int) bool { - return volumes[i].Id < volumes[j].Id + slices.SortFunc(volumes, func(a, b *master_pb.VolumeInformationMessage) bool { + return a.Id < b.Id }) } @@ -255,10 +255,9 @@ func balanceSelectedVolume(commandEnv *CommandEnv, diskType types.DiskType, volu for hasMoved { hasMoved = false - sort.Slice(nodesWithCapacity, func(i, j int) bool { - return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc) + slices.SortFunc(nodesWithCapacity, func(a, b *Node) bool { + return a.localVolumeRatio(capacityFunc) < b.localVolumeRatio(capacityFunc) }) - fullNode := nodesWithCapacity[len(nodesWithCapacity)-1] var candidateVolumes []*master_pb.VolumeInformationMessage for _, v := range fullNode.selectedVolumes { diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 54edd53dd..53284096d 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -9,9 +9,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "golang.org/x/exp/slices" "io" "math" - "sort" ) func init() { @@ -70,8 +70,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write } for _, replicas := range volumeReplicas { - sort.Slice(replicas, func(i, j int) bool { - return fileCount(replicas[i]) > fileCount(replicas[j]) + slices.SortFunc(replicas, func(a, b *VolumeReplica) bool { + return fileCount(a) > fileCount(b) }) for len(replicas) >= 2 { a, b := replicas[0], replicas[1] diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 78285d8a5..c4bef5925 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -7,9 +7,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "path/filepath" - "sort" "strconv" "time" @@ -308,8 +308,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { fn := capacityByFreeVolumeCount(diskType) - sort.Slice(dataNodes, func(i, j int) bool { - return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) + slices.SortFunc(dataNodes, func(a, b location) bool { + return fn(a.dataNode) > fn(b.dataNode) }) } @@ -488,9 +488,7 @@ func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[st } func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica { - - sort.Slice(replicas, func(i, j int) bool { - a, b := replicas[i], replicas[j] + slices.SortFunc(replicas, func(a, b *VolumeReplica) bool { if a.info.Size != b.info.Size { return a.info.Size < b.info.Size } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 1b3d7bf0d..7d3aa28a5 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -12,6 +12,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "sync" "github.com/chrislusf/seaweedfs/weed/filer" @@ -92,8 +93,27 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("failed to collect all volume locations: %v", err) } + isBucketsPath := false + var fillerBucketsPath string + if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "" { + fillerBucketsPath, err = readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read filer buckets path: %v", err) + } + if strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath) { + isBucketsPath = true + } + } + if err != nil { + return fmt.Errorf("read filer buckets path: %v", err) + } + // collect each volume file ids for volumeId, vinfo := range volumeIdToVInfo { + if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) { + delete(volumeIdToVInfo, volumeId) + continue + } err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index 4c0429ecb..9150752d5 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -6,9 +6,9 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "golang.org/x/exp/slices" "io" - "sort" ) func init() { @@ -67,8 +67,8 @@ func diskInfoToString(diskInfo *master_pb.DiskInfo) string { func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64, verbosityLevel int) statistics { output(verbosityLevel >= 0, writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(t.DiskInfos)) - sort.Slice(t.DataCenterInfos, func(i, j int) bool { - return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id + slices.SortFunc(t.DataCenterInfos, func(a, b *master_pb.DataCenterInfo) bool { + return a.Id < b.Id }) var s statistics for _, dc := range t.DataCenterInfos { @@ -80,8 +80,8 @@ func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLi func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo, verbosityLevel int) statistics { output(verbosityLevel >= 1, writer, " DataCenter %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics - sort.Slice(t.RackInfos, func(i, j int) bool { - return t.RackInfos[i].Id < t.RackInfos[j].Id + slices.SortFunc(t.RackInfos, func(a, b *master_pb.RackInfo) bool { + return a.Id < b.Id }) for _, r := range t.RackInfos { s = s.plus(writeRackInfo(writer, r, verbosityLevel)) @@ -92,8 +92,8 @@ func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo, verbosit func writeRackInfo(writer io.Writer, t *master_pb.RackInfo, verbosityLevel int) statistics { output(verbosityLevel >= 2, writer, " Rack %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics - sort.Slice(t.DataNodeInfos, func(i, j int) bool { - return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id + slices.SortFunc(t.DataNodeInfos, func(a, b *master_pb.DataNodeInfo) bool { + return a.Id < b.Id }) for _, dn := range t.DataNodeInfos { s = s.plus(writeDataNodeInfo(writer, dn, verbosityLevel)) @@ -118,8 +118,8 @@ func writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo, verbosityLevel int) diskType = "hdd" } output(verbosityLevel >= 4, writer, " Disk %s(%s)\n", diskType, diskInfoToString(t)) - sort.Slice(t.VolumeInfos, func(i, j int) bool { - return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id + slices.SortFunc(t.VolumeInfos, func(a, b *master_pb.VolumeInformationMessage) bool { + return a.Id < b.Id }) for _, vi := range t.VolumeInfos { s = s.plus(writeVolumeInformationMessage(writer, vi, verbosityLevel)) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 31ebcfec1..f07ea4b79 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -8,9 +8,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "os" - "sort" ) func init() { @@ -153,11 +153,9 @@ func evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyI func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) { for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { - - sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localShardIdCount(ecShardInfo.Id) < otherNodes[j].localShardIdCount(ecShardInfo.Id) + slices.SortFunc(otherNodes, func(a, b *EcNode) bool { + return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id) }) - for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] collectionPrefix := "" @@ -188,10 +186,9 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][ return v.DiskType == vol.DiskType }) } - sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localVolumeRatio(fn) > otherNodes[j].localVolumeRatio(fn) + slices.SortFunc(otherNodes, func(a, b *Node) bool { + return a.localVolumeRatio(fn) > b.localVolumeRatio(fn) }) - for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange) diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index 61b1f06fa..fc454c9ff 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -22,7 +22,7 @@ func (c *commandVacuum) Name() string { func (c *commandVacuum) Help() string { return `compact volumes if deleted entries are more than the limit - volume.vacuum [-garbageThreshold=0.3] + volume.vacuum [-garbageThreshold=0.3] [-collection=<collection name>] [-volumeId=<volume id>] ` } @@ -31,6 +31,8 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) garbageThreshold := volumeVacuumCommand.Float64("garbageThreshold", 0.3, "vacuum when garbage is more than this limit") + collection := volumeVacuumCommand.String("collection", "", "vacuum this collection") + volumeId := volumeVacuumCommand.Uint("volumeId", 0, "the volume id") if err = volumeVacuumCommand.Parse(args); err != nil { return } @@ -42,6 +44,8 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ GarbageThreshold: float32(*garbageThreshold), + VolumeId: uint32(*volumeId), + Collection: *collection, }) return err }) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index ec71edee0..3ff49f1d2 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -46,7 +46,7 @@ var ( func NewCommandEnv(options *ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()), option: options, } ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin") diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 90ce2dbb4..94a68f5bc 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -8,12 +8,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util/grace" + "golang.org/x/exp/slices" "io" "math/rand" "os" "path" "regexp" - "sort" "strings" "github.com/peterh/liner" @@ -25,11 +25,9 @@ var ( ) func RunShell(options ShellOptions) { - - sort.Slice(Commands, func(i, j int) bool { - return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0 + slices.SortFunc(Commands, func(a, b command) bool { + return strings.Compare(a.Name(), b.Name()) < 0 }) - line = liner.NewLiner() defer line.Close() grace.OnInterrupt(func() { |
