aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_fs_rm.go100
-rw-r--r--weed/shell/command_remote_configure.go142
-rw-r--r--weed/shell/command_volume_check_disk.go51
-rw-r--r--weed/shell/command_volume_fix_replication.go101
-rw-r--r--weed/shell/command_volume_fsck.go4
-rw-r--r--weed/shell/command_volume_server_evacuate.go9
6 files changed, 336 insertions, 71 deletions
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go
new file mode 100644
index 000000000..b383366ca
--- /dev/null
+++ b/weed/shell/command_fs_rm.go
@@ -0,0 +1,100 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsRm{})
+}
+
+type commandFsRm struct {
+}
+
+func (c *commandFsRm) Name() string {
+ return "fs.rm"
+}
+
+func (c *commandFsRm) Help() string {
+ return `remove file and directory entries
+
+ fs.rm [-rf] <entry1> <entry2> ...
+
+ fs.rm /dir/file_name1 dir/file_name2
+ fs.rm /dir
+
+ The option "-r" can be recursive.
+ The option "-f" can be ignored by recursive error.
+`
+}
+
+func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ isRecursive := false
+ ignoreRecursiveError := false
+ var entiries []string
+ for _, arg := range args {
+ if !strings.HasPrefix(arg, "-") {
+ entiries = append(entiries, arg)
+ continue
+ }
+ for _, t := range arg {
+ switch t {
+ case 'r':
+ isRecursive = true
+ case 'f':
+ ignoreRecursiveError = true
+ }
+ }
+ }
+ if len(entiries) < 1 {
+ return fmt.Errorf("need to have arguments")
+ }
+
+ commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ for _, entry := range entiries {
+ targetPath, err := commandEnv.parseUrl(entry)
+ if err != nil {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err)
+ continue
+ }
+
+ targetDir, targetName := util.FullPath(targetPath).DirAndName()
+
+ lookupRequest := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: targetDir,
+ Name: targetName,
+ }
+ _, err = filer_pb.LookupEntry(client, lookupRequest)
+ if err != nil {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err)
+ continue
+ }
+
+ request := &filer_pb.DeleteEntryRequest{
+ Directory: targetDir,
+ Name: targetName,
+ IgnoreRecursiveError: ignoreRecursiveError,
+ IsDeleteData: true,
+ IsRecursive: isRecursive,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ }
+ if resp, err := client.DeleteEntry(context.Background(), request); err != nil {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, err)
+ } else {
+ if resp.Error != "" {
+ fmt.Fprintf(writer, "rm: %s: %v\n", targetPath, resp.Error)
+ }
+ }
+ }
+ return nil
+ })
+
+ return
+}
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
new file mode 100644
index 000000000..567143ce1
--- /dev/null
+++ b/weed/shell/command_remote_configure.go
@@ -0,0 +1,142 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "io"
+ "regexp"
+)
+
+func init() {
+ Commands = append(Commands, &commandRemoteConfigure{})
+}
+
+type commandRemoteConfigure struct {
+}
+
+func (c *commandRemoteConfigure) Name() string {
+ return "remote.configure"
+}
+
+func (c *commandRemoteConfigure) Help() string {
+ return `remote storage configuration
+
+ # see the current configurations
+ remote.configure
+
+ # set or update a configuration
+ remote.configure -name=cloud1 -type=s3 -access_key=xxx -secret_key=yyy
+
+ # delete one configuration
+ remote.configure -delete -name=cloud1
+
+`
+}
+
+var (
+ isAlpha = regexp.MustCompile(`^[A-Za-z][A-Za-z0-9]*$`).MatchString
+)
+
+func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ conf := &filer_pb.RemoteConf{}
+
+ remoteConfigureCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ isDelete := remoteConfigureCommand.Bool("delete", false, "delete one remote storage by its name")
+
+ remoteConfigureCommand.StringVar(&conf.Name, "name", "", "a short name to identify the remote storage")
+ remoteConfigureCommand.StringVar(&conf.Type, "type", "s3", "storage type, currently only support s3")
+
+ remoteConfigureCommand.StringVar(&conf.S3AccessKey, "s3.access_key", "", "s3 access key")
+ remoteConfigureCommand.StringVar(&conf.S3SecretKey, "s3.secret_key", "", "s3 secret key")
+ remoteConfigureCommand.StringVar(&conf.S3Region, "s3.region", "us-east-2", "s3 region")
+ remoteConfigureCommand.StringVar(&conf.S3Endpoint, "s3.endpoint", "", "endpoint for s3-compatible local object store")
+
+ if err = remoteConfigureCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if conf.Name == "" {
+ return c.listExistingRemoteStorages(commandEnv, writer)
+ }
+
+ if !isAlpha(conf.Name) {
+ return fmt.Errorf("only letters and numbers allowed in name: %v", conf.Name)
+ }
+
+ if *isDelete {
+ return c.deleteRemoteStorage(commandEnv, writer, conf.Name)
+ }
+
+ return c.saveRemoteStorage(commandEnv, writer, conf)
+
+}
+
+func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandEnv, writer io.Writer) error {
+
+ return filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(filer.DirectoryEtcRemote), "", func(entry *filer_pb.Entry, isLast bool) error {
+ if len(entry.Content) == 0 {
+ fmt.Fprintf(writer, "skipping %s\n", entry.Name)
+ return nil
+ }
+ conf := &filer_pb.RemoteConf{}
+
+ if err := proto.Unmarshal(entry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
+ }
+
+ conf.S3SecretKey = ""
+
+ fmt.Fprintf(writer, "%+v\n", conf)
+
+ return nil
+ })
+
+}
+
+func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error {
+
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.DeleteEntryRequest{
+ Directory: filer.DirectoryEtcRemote,
+ Name: storageName,
+ IgnoreRecursiveError: false,
+ IsDeleteData: true,
+ IsRecursive: true,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ }
+ _, err := client.DeleteEntry(context.Background(), request)
+
+ if err == nil {
+ fmt.Fprintf(writer, "removed: %s\n", storageName)
+ }
+
+ return err
+
+ })
+
+}
+
+func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, writer io.Writer, conf *filer_pb.RemoteConf) error {
+
+ data, err := proto.Marshal(conf)
+ if err != nil {
+ return err
+ }
+
+ if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name, data)
+ }); err != nil && err != filer_pb.ErrNotFound {
+ return err
+ }
+
+ return nil
+
+}
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 6fc70c753..f76555bed 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -89,28 +89,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
continue
}
- aHasChanges, bHasChanges := true, true
- for aHasChanges || bHasChanges {
- // reset index db
- aDB.Close()
- bDB.Close()
- aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb()
-
- // read index db
- if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil {
- return err
- }
- if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil {
- return err
- }
-
- // find and make up the differences
- if aHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
- return err
- }
- if bHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
- return err
- }
+ if err := c.syncTwoReplicas(aDB, bDB, a, verbose, writer, b, err, applyChanges, nonRepairThreshold); err != nil {
+ fmt.Fprintf(writer, "snyc volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
replicas = replicas[1:]
}
@@ -119,6 +99,33 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
return nil
}
+func (c *commandVolumeCheckDisk) syncTwoReplicas(aDB *needle_map.MemDb, bDB *needle_map.MemDb, a *VolumeReplica, verbose *bool, writer io.Writer, b *VolumeReplica, err error, applyChanges *bool, nonRepairThreshold *float64) error {
+ aHasChanges, bHasChanges := true, true
+ for aHasChanges || bHasChanges {
+ // reset index db
+ aDB.Close()
+ bDB.Close()
+ aDB, bDB = needle_map.NewMemDb(), needle_map.NewMemDb()
+
+ // read index db
+ if err := c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, a.location.dataNode.Id, *verbose, writer); err != nil {
+ return err
+ }
+ if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, b.location.dataNode.Id, *verbose, writer); err != nil {
+ return err
+ }
+
+ // find and make up the differences
+ if aHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
+ return err
+ }
+ if bHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, *verbose, writer, *applyChanges, *nonRepairThreshold); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (c *commandVolumeCheckDisk) doVolumeCheckDisk(subtrahend, minuend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64) (hasChanges bool, err error) {
// find missing keys
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 538351fd0..9e6280788 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -58,6 +58,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
+ retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
}
@@ -100,7 +101,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
// find the most under populated data nodes
- return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
}
@@ -154,64 +155,72 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) {
for _, vid := range underReplicatedVolumeIds {
- replicas := volumeReplicas[vid]
- replica := pickOneReplicaToCopyFrom(replicas)
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
- foundNewLocation := false
- hasSkippedCollection := false
- keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
- fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
- for _, dst := range allLocations {
- // check whether data nodes satisfy the constraints
- if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
- // check collection name pattern
- if *c.collectionPattern != "" {
- matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
- if err != nil {
- return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
- }
- if !matched {
- hasSkippedCollection = true
- break
- }
- }
-
- // ask the volume server to replicate the volume
- foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
+ for i := 0; i < retryCount+1; i++ {
+ if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
+ continue
+ }
+ }
+ }
+ return
+}
- if !takeAction {
+func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
+ replicas := volumeReplicas[vid]
+ replica := pickOneReplicaToCopyFrom(replicas)
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ foundNewLocation := false
+ hasSkippedCollection := false
+ keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
+ fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
+ for _, dst := range allLocations {
+ // check whether data nodes satisfy the constraints
+ if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ hasSkippedCollection = true
break
}
+ }
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: replica.info.Id,
- SourceDataNode: replica.location.dataNode.Id,
- })
- if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
- }
- return nil
- })
+ // ask the volume server to replicate the volume
+ foundNewLocation = true
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
- if err != nil {
- return err
+ if !takeAction {
+ break
+ }
+
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
+ })
+ if replicateErr != nil {
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
+ return nil
+ })
- // adjust free volume count
- dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
- break
+ if err != nil {
+ return err
}
- }
- if !foundNewLocation && !hasSkippedCollection {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
+ // adjust free volume count
+ dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
+ break
}
+ }
+ if !foundNewLocation && !hasSkippedCollection {
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
return nil
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 2ced0f571..bd3be4d89 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -164,7 +164,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
if verbose && entry.Entry.IsDirectory {
fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
}
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
return nil
}
@@ -311,7 +311,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
files[i.vid].Write(buffer)
}
}, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
return nil
}
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index f21d0334c..6e0c19ae1 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -52,6 +52,7 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
+ retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
if err = vsEvacuateCommand.Parse(args); err != nil {
return nil
}
@@ -60,7 +61,13 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
}
- return volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer)
+ for i := 0; i < *retryCount+1; i++ {
+ if err = volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
+ return nil
+ }
+ }
+
+ return
}