diff options
| author | eddy-gfx <86091021+gfxlabs@users.noreply.github.com> | 2022-04-06 18:45:55 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-06 18:45:55 -0500 |
| commit | ec53eec94f8a745c6a289d593725824a00314793 (patch) | |
| tree | 415e4d43eb89c415caa5b38f31f7aa15bbe49cc3 /weed/shell | |
| parent | 7e925175715c57b49552a94d25d1d3dc40f1d881 (diff) | |
| parent | 3ab2c0e5c0263de5af35dad91a9107e6038f4203 (diff) | |
| download | seaweedfs-ec53eec94f8a745c6a289d593725824a00314793.tar.xz seaweedfs-ec53eec94f8a745c6a289d593725824a00314793.zip | |
Merge branch 'master' into a
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_cluster_check.go | 233 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_mount_configure.go | 64 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 20 |
4 files changed, 318 insertions, 1 deletions
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go new file mode 100644 index 000000000..f841bd224 --- /dev/null +++ b/weed/shell/command_cluster_check.go @@ -0,0 +1,233 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) + +func init() { + Commands = append(Commands, &commandClusterCheck{}) +} + +type commandClusterCheck struct { +} + +func (c *commandClusterCheck) Name() string { + return "cluster.check" +} + +func (c *commandClusterCheck) Help() string { + return `check current cluster network connectivity + + cluster.check + +` +} + +func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + if err = clusterPsCommand.Parse(args); err != nil { + return nil + } + + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return err + } + fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos)) + + emptyDiskTypeDiskInfo, emptyDiskTypeFound := topologyInfo.DiskInfos[""] + hddDiskTypeDiskInfo, hddDiskTypeFound := topologyInfo.DiskInfos["hdd"] + if !emptyDiskTypeFound && !hddDiskTypeFound || emptyDiskTypeDiskInfo.VolumeCount == 0 && hddDiskTypeDiskInfo.VolumeCount == 0 { + return fmt.Errorf("Need to a hdd disk type!") + } + + // collect filers + var filers []pb.ServerAddress + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + + for _, node := range resp.ClusterNodes { + filers = append(filers, pb.ServerAddress(node.Address)) + } + return err + }) + if err != nil { + return + } + fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers) + + // collect volume servers + var volumeServers []pb.ServerAddress + t, _, err := collectTopologyInfo(commandEnv, 0) + if err != nil { + return err + } + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn)) + } + } + } + fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers) + + // collect all masters + var masters []pb.ServerAddress + for _, master := range commandEnv.MasterClient.GetMasters() { + masters = append(masters, master) + } + + // check from master to volume servers + for _, master := range masters { + for _, volumeServer := range volumeServers { + fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer)) + err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + _, err := client.Ping(context.Background(), &master_pb.PingRequest{ + Target: string(volumeServer), + TargetType: cluster.VolumeServerType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check between masters + for _, sourceMaster := range masters { + for _, targetMaster := range masters { + if sourceMaster == targetMaster { + continue + } + fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster)) + err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + _, err := client.Ping(context.Background(), &master_pb.PingRequest{ + Target: string(targetMaster), + TargetType: cluster.MasterType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check from volume servers to masters + for _, volumeServer := range volumeServers { + for _, master := range masters { + fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master)) + err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ + Target: string(master), + TargetType: cluster.MasterType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check from filers to masters + for _, filer := range filers { + for _, master := range masters { + fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master)) + err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + Target: string(master), + TargetType: cluster.MasterType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check from filers to volume servers + for _, filer := range filers { + for _, volumeServer := range volumeServers { + fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer)) + err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + Target: string(volumeServer), + TargetType: cluster.VolumeServerType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check between volume servers + for _, sourceVolumeServer := range volumeServers { + for _, targetVolumeServer := range volumeServers { + if sourceVolumeServer == targetVolumeServer { + continue + } + fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer)) + err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{ + Target: string(targetVolumeServer), + TargetType: cluster.VolumeServerType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + // check between filers, and need to connect to itself + for _, sourceFiler := range filers { + for _, targetFiler := range filers { + fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler)) + err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + _, err := client.Ping(context.Background(), &filer_pb.PingRequest{ + Target: string(targetFiler), + TargetType: cluster.FilerType, + }) + return err + }) + if err == nil { + fmt.Fprintf(writer, "ok\n") + } else { + fmt.Fprintf(writer, "%v\n", err) + } + } + } + + return nil +} diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 9ae9b049c..251448908 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -95,7 +95,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) - if !found { + if !found && len(locations) > 0 { return fmt.Errorf("volume %d not found", vid) } diff --git a/weed/shell/command_mount_configure.go b/weed/shell/command_mount_configure.go new file mode 100644 index 000000000..8c268d35c --- /dev/null +++ b/weed/shell/command_mount_configure.go @@ -0,0 +1,64 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + _ "google.golang.org/grpc/resolver/passthrough" + "io" +) + +func init() { + Commands = append(Commands, &commandMountConfigure{}) +} + +type commandMountConfigure struct { +} + +func (c *commandMountConfigure) Name() string { + return "mount.configure" +} + +func (c *commandMountConfigure) Help() string { + return `configure the mount on current server + + mount.configure -dir=<mount_directory> + + This command connects with local mount via unix socket, so it can only run locally. + The "mount_directory" value needs to be exactly the same as how mount was started in "weed mount -dir=<mount_directory>" + +` +} + +func (c *commandMountConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + mountConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + mountDir := mountConfigureCommand.String("dir", "", "the mount directory same as how \"weed mount -dir=<mount_directory>\" was started") + mountQuota := mountConfigureCommand.Int("quotaMB", 0, "the quota in MB") + if err = mountConfigureCommand.Parse(args); err != nil { + return nil + } + + mountDirHash := util.HashToInt32([]byte(*mountDir)) + if mountDirHash < 0 { + mountDirHash = -mountDirHash + } + localSocket := fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash) + + clientConn, err := grpc.Dial("passthrough:///unix://"+localSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return + } + defer clientConn.Close() + + client := mount_pb.NewSeaweedMountClient(clientConn) + _, err = client.Configure(context.Background(), &mount_pb.ConfigureRequest{ + CollectionCapacity: int64(*mountQuota) * 1024 * 1024, + }) + + return +} diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 1b3d7bf0d..7d3aa28a5 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -12,6 +12,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "sync" "github.com/chrislusf/seaweedfs/weed/filer" @@ -92,8 +93,27 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("failed to collect all volume locations: %v", err) } + isBucketsPath := false + var fillerBucketsPath string + if *findMissingChunksInFiler && *findMissingChunksInFilerPath != "" { + fillerBucketsPath, err = readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read filer buckets path: %v", err) + } + if strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath) { + isBucketsPath = true + } + } + if err != nil { + return fmt.Errorf("read filer buckets path: %v", err) + } + // collect each volume file ids for volumeId, vinfo := range volumeIdToVInfo { + if isBucketsPath && !strings.HasPrefix(*findMissingChunksInFilerPath, fillerBucketsPath+"/"+vinfo.collection) { + delete(volumeIdToVInfo, volumeId) + continue + } err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) |
