diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_fs_rm.go | 100 | ||||
| -rw-r--r-- | weed/shell/command_remote_configure.go | 142 | ||||
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 51 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 101 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 9 |
6 files changed, 336 insertions, 71 deletions
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go new file mode 100644 index 000000000..b383366ca --- /dev/null +++ b/weed/shell/command_fs_rm.go @@ -0,0 +1,100 @@ +package shell + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsRm{}) +} + +type commandFsRm struct { +} + +func (c *commandFsRm) Name() string { + return "fs.rm" +} + +func (c *commandFsRm) Help() string { + return `remove file and directory entries + + fs.rm [-rf] <entry1> <entry2> ... + + fs.rm /dir/file_name1 dir/file_name2 + fs.rm /dir + + The option "-r" can be recursive. + The option "-f" can be ignored by recursive error. +` +} + +func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + isRecursive := false + ignoreRecursiveError := false + var entiries []string + for _, arg := range args { + if !strings.HasPrefix(arg, "-") { + entiries = append(entiries, arg) + continue + } + for _, t := range arg { + switch t { + case 'r': + isRecursive = true + case 'f': + ignoreRecursiveError = true + } + } + } + if len(entiries) < 1 { + return fmt.Errorf("need to have arguments") + } + + commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + for _, entry := range entiries { + targetPath, err := commandEnv.parseUrl(entry) + if err != nil { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err) + continue + } + + targetDir, targetName := util.FullPath(targetPath).DirAndName() + + lookupRequest := &filer_pb.LookupDirectoryEntryRequest{ + Directory: targetDir, + Name: targetName, + } + _, err = filer_pb.LookupEntry(client, lookupRequest) + if err != nil { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err) + continue + } + + request := &filer_pb.DeleteEntryRequest{ + Directory: targetDir, + Name: targetName, + IgnoreRecursiveError: ignoreRecursiveError, + IsDeleteData: true, + IsRecursive: isRecursive, + IsFromOtherCluster: false, + Signatures: nil, + } + if resp, err := client.DeleteEntry(context.Background(), request); err != nil { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err) + } else { + if resp.Error != "" { + fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, resp.Error) + } + } + } + return nil + }) + + return +} diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go new file mode 100644 index 000000000..567143ce1 --- /dev/null +++ b/weed/shell/command_remote_configure.go @@ -0,0 +1,142 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "io" + "regexp" +) + +func init() { + Commands = append(Commands, &commandRemoteConfigure{}) +} + +type commandRemoteConfigure struct { +} + +func (c *commandRemoteConfigure) Name() string { + return "remote.configure" +} + +func (c *commandRemoteConfigure) Help() string { + return `remote storage configuration + + # see the current configurations + remote.configure + + # set or update a configuration + remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy + + # delete one configuration + remote.configure -delete -name=cloud1 + +` +} + +var ( + isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString +) + +func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + conf := &filer_pb.RemoteConf{} + + remoteConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + isDelete := remoteConfigureCommand.Bool("delete", false, "delete one remote storage by its name") + + remoteConfigureCommand.StringVar(&conf.Name, "name", "", "a short name to identify the remote storage") + remoteConfigureCommand.StringVar(&conf.Type, "type", "s3", "storage type, currently only support s3") + + remoteConfigureCommand.StringVar(&conf.S3AccessKey, "s3.access_key", "", "s3 access key") + remoteConfigureCommand.StringVar(&conf.S3SecretKey, "s3.secret_key", "", "s3 secret key") + remoteConfigureCommand.StringVar(&conf.S3Region, "s3.region", "us-east-2", "s3 region") + remoteConfigureCommand.StringVar(&conf.S3Endpoint, "s3.endpoint", "", "endpoint for s3-compatible local object store") + + if err = remoteConfigureCommand.Parse(args); err != nil { + return nil + } + + if conf.Name == "" { + return c.listExistingRemoteStorages(commandEnv, writer) + } + + if !isAlpha(conf.Name) { + return fmt.Errorf("only letters and numbers allowed in name: %v", conf.Name) + } + + if *isDelete { + return c.deleteRemoteStorage(commandEnv, writer, conf.Name) + } + + return c.saveRemoteStorage(commandEnv, writer, conf) + +} + +func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandEnv, writer io.Writer) error { + + return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(filer.DirectoryEtcRemote), "", func(entry *filer_pb.Entry, isLast bool) error { + if len(entry.Content) == 0 { + fmt.Fprintf(writer, "skipping %s\n", entry.Name) + return nil + } + conf := &filer_pb.RemoteConf{} + + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) + } + + conf.S3SecretKey = "" + + fmt.Fprintf(writer, "%+v\n", conf) + + return nil + }) + +} + +func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error { + + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.DeleteEntryRequest{ + Directory: filer.DirectoryEtcRemote, + Name: storageName, + IgnoreRecursiveError: false, + IsDeleteData: true, + IsRecursive: true, + IsFromOtherCluster: false, + Signatures: nil, + } + _, err := client.DeleteEntry(context.Background(), request) + + if err == nil { + fmt.Fprintf(writer, "removed: %s\n", storageName) + } + + return err + + }) + +} + +func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, writer io.Writer, conf *filer_pb.RemoteConf) error { + + data, err := proto.Marshal(conf) + if err != nil { + return err + } + + if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name, data) + }); err != nil && err != filer_pb.ErrNotFound { + return err + } + + return nil + +} diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 6fc70c753..f76555bed 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -89,28 +89,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write continue } - aHasChanges, bHasChanges := true, true - for aHasChanges || bHasChanges { - // reset index db - aDB.Close() - bDB.Close() - aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb() - - // read index db - if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil { - return err - } - if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil { - return err - } - - // find and make up the differences - if aHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { - return err - } - if bHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { - return err - } + if err := c.syncTwoReplicas(aDB, bDB, a, verbose, writer, b, err, applyChanges, nonRepairThreshold); err != nil { + fmt.Fprintf(writer, "snyc volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } replicas = replicas[1:] } @@ -119,6 +99,33 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write return nil } +func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *needle_map.MemDb, a *VolumeReplica, verbose *bool, writer io.Writer, b *VolumeReplica, err error, applyChanges *bool, nonRepairThreshold *float64) error { + aHasChanges, bHasChanges := true, true + for aHasChanges || bHasChanges { + // reset index db + aDB.Close() + bDB.Close() + aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb() + + // read index db + if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil { + return err + } + if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil { + return err + } + + // find and make up the differences + if aHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { + return err + } + if bHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil { + return err + } + } + return nil +} + func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64) (hasChanges bool, err error) { // find missing keys diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 538351fd0..9e6280788 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -58,6 +58,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") + retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") if err = volFixReplicationCommand.Parse(args); err != nil { return nil } @@ -100,7 +101,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) } @@ -154,64 +155,72 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { for _, vid := range underReplicatedVolumeIds { - replicas := volumeReplicas[vid] - replica := pickOneReplicaToCopyFrom(replicas) - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - foundNewLocation := false - hasSkippedCollection := false - keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) - for _, dst := range allLocations { - // check whether data nodes satisfy the constraints - if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { - // check collection name pattern - if *c.collectionPattern != "" { - matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) - if err != nil { - return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) - } - if !matched { - hasSkippedCollection = true - break - } - } - - // ask the volume server to replicate the volume - foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) + for i := 0; i < retryCount+1; i++ { + if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + continue + } + } + } + return +} - if !takeAction { +func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { + replicas := volumeReplicas[vid] + replica := pickOneReplicaToCopyFrom(replicas) + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + foundNewLocation := false + hasSkippedCollection := false + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + hasSkippedCollection = true break } + } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, - }) - if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) - } - return nil - }) + // ask the volume server to replicate the volume + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) - if err != nil { - return err + if !takeAction { + break + } + + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, + }) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } + return nil + }) - // adjust free volume count - dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- - break + if err != nil { + return err } - } - if !foundNewLocation && !hasSkippedCollection { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) + // adjust free volume count + dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- + break } + } + if !foundNewLocation && !hasSkippedCollection { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } return nil } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 2ced0f571..bd3be4d89 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -164,7 +164,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint if verbose && entry.Entry.IsDirectory { fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) } - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { return nil } @@ -311,7 +311,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer files[i.vid].Write(buffer) } }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) + dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { return nil } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index f21d0334c..6e0c19ae1 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -52,6 +52,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server") skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved") applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes") + retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry") if err = vsEvacuateCommand.Parse(args); err != nil { return nil } @@ -60,7 +61,13 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, return fmt.Errorf("need to specify volume server by -node=<host>:<port>") } - return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer) + for i := 0; i < *retryCount+1; i++ { + if err = volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil { + return nil + } + } + + return } |
