aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-21 12:08:58 -0800
committerchrislu <chris.lu@gmail.com>2022-01-21 12:08:58 -0800
commitce2049cdb6f3082d854a2649b4eea53f9774ddb5 (patch)
tree4f28ee0b74ec3365f6d93e94fc77e209751393ee
parente47f63d15987d38bc721fa06dac137f51e78562d (diff)
downloadseaweedfs-ce2049cdb6f3082d854a2649b4eea53f9774ddb5.tar.xz
seaweedfs-ce2049cdb6f3082d854a2649b4eea53f9774ddb5.zip
refactoring, move genFn before saveFn
-rw-r--r--weed/shell/command_fs_meta_save.go20
-rw-r--r--weed/shell/command_volume_fsck.go48
2 files changed, 34 insertions, 34 deletions
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index 2cbe83e21..ea909a527 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -78,15 +78,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
cipherKey = util.GenCipherKey()
}
- err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) {
- sizeBuf := make([]byte, 4)
- for item := range outputChan {
- b := item.([]byte)
- util.Uint32toBytes(sizeBuf, uint32(len(b)))
- dst.Write(sizeBuf)
- dst.Write(b)
- }
- }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if !entry.Entry.IsDirectory {
ext := filepath.Ext(entry.Entry.Name)
if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil {
@@ -102,6 +94,14 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
outputChan <- bytes
return nil
+ }, func(outputChan chan interface{}) {
+ sizeBuf := make([]byte, 4)
+ for item := range outputChan {
+ b := item.([]byte)
+ util.Uint32toBytes(sizeBuf, uint32(len(b)))
+ dst.Write(sizeBuf)
+ dst.Write(b)
+ }
})
if err == nil {
@@ -112,7 +112,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
}
-func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error {
+func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{})) error {
var wg sync.WaitGroup
wg.Add(1)
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 3302542aa..2d570fef3 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -145,22 +145,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
cookie uint32
path util.FullPath
}
- return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) {
- buffer := make([]byte, 16)
- for item := range outputChan {
- i := item.(*Item)
- if f, ok := files[i.vid]; ok {
- util.Uint64toBytes(buffer, i.fileKey)
- util.Uint32toBytes(buffer[8:], i.cookie)
- util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
- f.Write(buffer)
- f.Write([]byte(i.path))
- // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
- } else {
- fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
- }
- }
- }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
if verbose && entry.Entry.IsDirectory {
fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
}
@@ -178,6 +163,21 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
}
}
return nil
+ }, func(outputChan chan interface{}) {
+ buffer := make([]byte, 16)
+ for item := range outputChan {
+ i := item.(*Item)
+ if f, ok := files[i.vid]; ok {
+ util.Uint64toBytes(buffer, i.fileKey)
+ util.Uint32toBytes(buffer[8:], i.cookie)
+ util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
+ f.Write(buffer)
+ f.Write([]byte(i.path))
+ // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
+ } else {
+ fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
+ }
+ }
})
}
@@ -307,14 +307,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
vid uint32
fileKey uint64
}
- return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
- buffer := make([]byte, 8)
- for item := range outputChan {
- i := item.(*Item)
- util.Uint64toBytes(buffer, i.fileKey)
- files[i.vid].Write(buffer)
- }
- }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
if verbose {
@@ -330,6 +323,13 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
}
}
return nil
+ }, func(outputChan chan interface{}) {
+ buffer := make([]byte, 8)
+ for item := range outputChan {
+ i := item.(*Item)
+ util.Uint64toBytes(buffer, i.fileKey)
+ files[i.vid].Write(buffer)
+ }
})
}