diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_fs_du.go | 96 | ||||
| -rw-r--r-- | weed/shell/command_fs_ls.go | 99 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_notify.go | 45 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 144 | ||||
| -rw-r--r-- | weed/shell/command_fs_tree.go | 87 | ||||
| -rw-r--r-- | weed/shell/commands.go | 24 |
6 files changed, 197 insertions, 298 deletions
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 5e634c82a..b6e767f17 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -3,11 +3,13 @@ package shell import ( "context" "fmt" + "io" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "io" ) func init() { @@ -43,67 +45,42 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer path = path + "/" } - dir, name := filer2.FullPath(path).DirAndName() - - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - _, _, err = paginateDirectory(ctx, writer, client, dir, name, 1000) + var blockCount, byteCount uint64 + dir, name := filer2.FullPath(path).DirAndName() + blockCount, byteCount, err = duTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name) - return err + if name == "" && err == nil { + fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir) + } - }) + return } -func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int) (blockCount uint64, byteCount uint64, err error) { - - paginatedCount := -1 - startFromFileName := "" - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: dir, - Prefix: name, - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr - return - } +func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount uint64, byteCount uint64, err error) { - paginatedCount = len(resp.Entries) - - for _, entry := range resp.Entries { - if entry.IsDirectory { - subDir := fmt.Sprintf("%s/%s", dir, entry.Name) - if dir == "/" { - subDir = "/" + entry.Name - } - numBlock, numByte, err := paginateDirectory(ctx, writer, client, subDir, "", paginateSize) - if err == nil { - blockCount += numBlock - byteCount += numByte - } - } else { - blockCount += uint64(len(entry.Chunks)) - byteCount += filer2.TotalSize(entry.Chunks) + err = filer2.ReadDirAllEntries(ctx, filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { + if entry.IsDirectory { + subDir := fmt.Sprintf("%s/%s", dir, entry.Name) + if dir == "/" { + subDir = "/" + entry.Name } - startFromFileName = entry.Name - - if name != "" && !entry.IsDirectory { - fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name) + numBlock, numByte, err := duTraverseDirectory(ctx, writer, filerClient, subDir, "") + if err == nil { + blockCount += numBlock + byteCount += numByte } + } else { + blockCount += uint64(len(entry.Chunks)) + byteCount += filer2.TotalSize(entry.Chunks) } - } - - if name == "" { - fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir) - } + if name != "" && !entry.IsDirectory { + fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name) + } + }) return - } func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { @@ -115,3 +92,20 @@ func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, }, filerGrpcAddress, env.option.GrpcDialOption) } + +type commandFilerClient struct { + env *CommandEnv + filerServer string + filerPort int64 +} + +func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *commandFilerClient { + return &commandFilerClient{ + env: env, + filerServer: filerServer, + filerPort: filerPort, + } +} +func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { + return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn) +} diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 6979635e1..01842083b 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -3,13 +3,14 @@ package shell import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" "os" "os/user" "strconv" "strings" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func init() { @@ -66,83 +67,51 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer } dir, name := filer2.FullPath(path).DirAndName() + entryCount := 0 - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - - return paginateOneDirectory(ctx, writer, client, dir, name, 1000, isLongFormat, showHidden) - - }) - -} - -func paginateOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int, isLongFormat, showHidden bool) (err error) { + err = filer2.ReadDirAllEntries(ctx, commandEnv.getFilerClient(filerServer, filerPort), dir, name, func(entry *filer_pb.Entry, isLast bool) { - entryCount := 0 - paginatedCount := -1 - startFromFileName := "" - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: dir, - Prefix: name, - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr + if !showHidden && strings.HasPrefix(entry.Name, ".") { return } - paginatedCount = len(resp.Entries) - - for _, entry := range resp.Entries { + entryCount++ - if !showHidden && strings.HasPrefix(entry.Name, ".") { - continue - } - - entryCount++ - - if isLongFormat { - fileMode := os.FileMode(entry.Attributes.FileMode) - userName, groupNames := entry.Attributes.UserName, entry.Attributes.GroupName - if userName == "" { - if user, userErr := user.LookupId(strconv.Itoa(int(entry.Attributes.Uid))); userErr == nil { - userName = user.Username - } - } - groupName := "" - if len(groupNames) > 0 { - groupName = groupNames[0] + if isLongFormat { + fileMode := os.FileMode(entry.Attributes.FileMode) + userName, groupNames := entry.Attributes.UserName, entry.Attributes.GroupName + if userName == "" { + if user, userErr := user.LookupId(strconv.Itoa(int(entry.Attributes.Uid))); userErr == nil { + userName = user.Username } - if groupName == "" { - if group, groupErr := user.LookupGroupId(strconv.Itoa(int(entry.Attributes.Gid))); groupErr == nil { - groupName = group.Name - } - } - - if dir == "/" { - // just for printing - dir = "" + } + groupName := "" + if len(groupNames) > 0 { + groupName = groupNames[0] + } + if groupName == "" { + if group, groupErr := user.LookupGroupId(strconv.Itoa(int(entry.Attributes.Gid))); groupErr == nil { + groupName = group.Name } - fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n", - fileMode, len(entry.Chunks), - userName, groupName, - filer2.TotalSize(entry.Chunks), dir, entry.Name) - } else { - fmt.Fprintf(writer, "%s\n", entry.Name) } - startFromFileName = entry.Name - + if dir == "/" { + // just for printing + dir = "" + } + fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n", + fileMode, len(entry.Chunks), + userName, groupName, + filer2.TotalSize(entry.Chunks), dir, entry.Name) + } else { + fmt.Fprintf(writer, "%s\n", entry.Name) } - } - if isLongFormat { + }) + + if isLongFormat && err == nil { fmt.Fprintf(writer, "total %d\n", entryCount) } return - } diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index 4fe0e45a9..a898df7a0 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -5,11 +5,12 @@ import ( "fmt" "io" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" ) func init() { @@ -46,33 +47,33 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i ctx := context.Background() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + var dirCount, fileCount uint64 - var dirCount, fileCount uint64 + err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { - err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { - - if entry.IsDirectory { - dirCount++ - } else { - fileCount++ - } - - return notification.Queue.SendMessage( - string(parentPath.Child(entry.Name)), - &filer_pb.EventNotification{ - NewEntry: entry, - }, - ) + if entry.IsDirectory { + dirCount++ + } else { + fileCount++ + } - }) + notifyErr := notification.Queue.SendMessage( + string(parentPath.Child(entry.Name)), + &filer_pb.EventNotification{ + NewEntry: entry, + }, + ) - if err == nil { - fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount) + if notifyErr != nil { + fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr) } - return err - }) + if err == nil { + fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount) + } + + return err + } diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index dd5e9defb..4ff00c64b 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -54,74 +54,69 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) - if err != nil { - return err + filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + if parseErr != nil { + return parseErr } ctx := context.Background() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - - t := time.Now() - fileName := *outputFileName - if fileName == "" { - fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) - } - - dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return nil - } - defer dst.Close() - - var dirCount, fileCount uint64 - - err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { - - protoMessage := &filer_pb.FullEntry{ - Dir: string(parentPath), - Entry: entry, - } + t := time.Now() + fileName := *outputFileName + if fileName == "" { + fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", + filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + } - bytes, err := proto.Marshal(protoMessage) - if err != nil { - return fmt.Errorf("marshall error: %v", err) - } + dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", fileName, openErr) + } + defer dst.Close() - sizeBuf := make([]byte, 4) - util.Uint32toBytes(sizeBuf, uint32(len(bytes))) + var dirCount, fileCount uint64 - dst.Write(sizeBuf) - dst.Write(bytes) + err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { - if entry.IsDirectory { - atomic.AddUint64(&dirCount, 1) - } else { - atomic.AddUint64(&fileCount, 1) - } + protoMessage := &filer_pb.FullEntry{ + Dir: string(parentPath), + Entry: entry, + } - if *verbose { - println(parentPath.Child(entry.Name)) - } + bytes, err := proto.Marshal(protoMessage) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } - return nil + sizeBuf := make([]byte, 4) + util.Uint32toBytes(sizeBuf, uint32(len(bytes))) - }) + dst.Write(sizeBuf) + dst.Write(bytes) - if err == nil { - fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) - fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + if entry.IsDirectory { + atomic.AddUint64(&dirCount, 1) + } else { + atomic.AddUint64(&fileCount, 1) } - return err + if *verbose { + println(parentPath.Child(entry.Name)) + } }) + if err == nil { + fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) + fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + } + + return err + } -func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, - parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { +func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, + parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { K := 5 @@ -143,7 +138,7 @@ func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.Seawee continue } dir := t.(filer2.FullPath) - processErr := processOneDirectory(ctx, writer, client, dir, queue, &jobQueueWg, fn) + processErr := processOneDirectory(ctx, writer, filerClient, dir, queue, &jobQueueWg, fn) if processErr != nil { err = processErr } @@ -156,47 +151,22 @@ func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.Seawee return } -func processOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, +func processOneDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, - fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { - - paginatedCount := -1 - startFromFileName := "" - paginateSize := 1000 - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: string(parentPath), - Prefix: "", - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr - return - } + fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - paginatedCount = len(resp.Entries) + return filer2.ReadDirAllEntries(ctx, filerClient, string(parentPath), "", func(entry *filer_pb.Entry, isLast bool) { - for _, entry := range resp.Entries { + fn(parentPath, entry) - if err = fn(parentPath, entry); err != nil { - return err + if entry.IsDirectory { + subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) + if parentPath == "/" { + subDir = "/" + entry.Name } - - if entry.IsDirectory { - subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) - if parentPath == "/" { - subDir = "/" + entry.Name - } - jobQueueWg.Add(1) - queue.Enqueue(filer2.FullPath(subDir)) - } - startFromFileName = entry.Name + jobQueueWg.Add(1) + queue.Enqueue(filer2.FullPath(subDir)) } - } - - return + }) } diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index 8474e43ea..a4524f341 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -3,10 +3,11 @@ package shell import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" "strings" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) func init() { @@ -38,75 +39,45 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ ctx := context.Background() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name, newPrefix(), -1) - dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, dir, name, newPrefix(), -1) - - if terr == nil { - fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) - } - - return terr + if terr == nil { + fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) + } - }) + return terr } -func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { - - paginatedCount := -1 - startFromFileName := "" - paginateSize := 1000 - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: dir, - Prefix: name, - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr - return - } - paginatedCount = len(resp.Entries) - if paginatedCount > 0 { - prefix.addMarker(level) - } +func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { - for i, entry := range resp.Entries { + prefix.addMarker(level) - if level < 0 && name != "" { - if entry.Name != name { - break - } + err = filer2.ReadDirAllEntries(ctx, filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { + if level < 0 && name != "" { + if entry.Name != name { + return } + } - // 0.1% wrong prefix here, but fixing it would need to paginate to the next batch first - isLast := paginatedCount < paginateSize && i == paginatedCount-1 - fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name) - - if entry.IsDirectory { - directoryCount++ - subDir := fmt.Sprintf("%s/%s", dir, entry.Name) - if dir == "/" { - subDir = "/" + entry.Name - } - dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, subDir, "", prefix, level+1) - directoryCount += dirCount - fileCount += fCount - err = terr - } else { - fileCount++ - } - startFromFileName = entry.Name + fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name) + if entry.IsDirectory { + directoryCount++ + subDir := fmt.Sprintf("%s/%s", dir, entry.Name) + if dir == "/" { + subDir = "/" + entry.Name + } + dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, filerClient, subDir, "", prefix, level+1) + directoryCount += dirCount + fileCount += fCount + err = terr + } else { + fileCount++ } - } + }) return - } type Prefix struct { diff --git a/weed/shell/commands.go b/weed/shell/commands.go index b642ec253..a6a0f7dec 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -9,10 +9,11 @@ import ( "strconv" "strings" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/wdclient" - "google.golang.org/grpc" ) type ShellOptions struct { @@ -71,26 +72,19 @@ func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, fi return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: dir, - Prefix: name, - StartFromFileName: name, - InclusiveStartFrom: true, - Limit: 1, + resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, }) - if listErr != nil { - return listErr + if lookupErr != nil { + return lookupErr } - if len(resp.Entries) == 0 { + if resp.Entry == nil { return fmt.Errorf("entry not found") } - if resp.Entries[0].Name != name { - return fmt.Errorf("not a valid directory, found %s", resp.Entries[0].Name) - } - - if !resp.Entries[0].IsDirectory { + if !resp.Entry.IsDirectory { return fmt.Errorf("not a directory") } |
