aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/commands.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/shell/commands.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/shell/commands.go')
-rw-r--r--weed/shell/commands.go79
1 files changed, 46 insertions, 33 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index a6a0f7dec..0e285214b 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -1,19 +1,19 @@
package shell
import (
- "context"
"fmt"
"io"
"net/url"
- "path/filepath"
"strconv"
"strings"
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks"
)
type ShellOptions struct {
@@ -29,6 +29,7 @@ type CommandEnv struct {
env map[string]string
MasterClient *wdclient.MasterClient
option ShellOptions
+ locker *exclusive_locks.ExclusiveLocker
}
type command interface {
@@ -42,55 +43,67 @@ var (
)
func NewCommandEnv(options ShellOptions) *CommandEnv {
- return &CommandEnv{
- env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(context.Background(),
- options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")),
- option: options,
+ ce := &CommandEnv{
+ env: make(map[string]string),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")),
+ option: options,
}
+ ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
+ return ce
}
-func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) {
+func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
if strings.HasPrefix(input, "http") {
- return parseFilerUrl(input)
+ err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
+ return
}
if !strings.HasPrefix(input, "/") {
- input = filepath.ToSlash(filepath.Join(ce.option.Directory, input))
+ input = util.Join(ce.option.Directory, input)
}
- return ce.option.FilerHost, ce.option.FilerPort, input, err
+ return input, err
}
-func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
+func (ce *CommandEnv) isDirectory(path string) bool {
- return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil
+ return ce.checkDirectory(path) == nil
}
-func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
+func (ce *CommandEnv) confirmIsLocked() error {
- dir, name := filer2.FullPath(path).DirAndName()
+ if ce.locker.IsLocking() {
+ return nil
+ }
- return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ return fmt.Errorf("need to lock to continue")
- resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- })
- if lookupErr != nil {
- return lookupErr
- }
+}
- if resp.Entry == nil {
- return fmt.Errorf("entry not found")
- }
+func (ce *CommandEnv) checkDirectory(path string) error {
- if !resp.Entry.IsDirectory {
- return fmt.Errorf("not a directory")
- }
+ dir, name := util.FullPath(path).DirAndName()
- return nil
- })
+ exists, err := filer_pb.Exists(ce, dir, name, true)
+
+ if !exists {
+ return fmt.Errorf("%s is not a directory", path)
+ }
+
+ return err
+
+}
+
+var _ = filer_pb.FilerClient(&CommandEnv{})
+
+func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress := fmt.Sprintf("%s:%d", ce.option.FilerHost, ce.option.FilerPort+10000)
+ return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn)
+
+}
+func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
@@ -107,7 +120,7 @@ func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path
}
path = u.Path
} else {
- err = fmt.Errorf("path should have full url http://<filer_server>:<port>/path/to/dirOrFile : %s", entryPath)
+ err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
}
return
}