aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/filer_cat.go118
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/filer/filechunk_manifest.go7
-rw-r--r--weed/filer/filechunks.go9
-rw-r--r--weed/filer/reader_at.go6
-rw-r--r--weed/filer/stream.go8
-rw-r--r--weed/replication/sink/filersink/filer_sink.go3
-rw-r--r--weed/wdclient/vid_map.go10
9 files changed, 147 insertions, 17 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 0df22b575..8f9cec087 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -15,6 +15,7 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
+ cmdFilerCat,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
new file mode 100644
index 000000000..a46098b04
--- /dev/null
+++ b/weed/command/filer_cat.go
@@ -0,0 +1,118 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "google.golang.org/grpc"
+ "math"
+ "net/url"
+ "os"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ filerCat FilerCatOptions
+)
+
+type FilerCatOptions struct {
+ grpcDialOption grpc.DialOption
+ filerAddress string
+ filerClient filer_pb.SeaweedFilerClient
+ output *string
+}
+
+func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
+ return func(fileId string) (targetUrls []string, err error) {
+ vid := filer.VolumeId(fileId)
+ resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return nil, err
+ }
+ locations := resp.LocationsMap[vid]
+ for _, loc := range locations.Locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
+ }
+ return
+ }
+}
+
+func init() {
+ cmdFilerCat.Run = runFilerCat // break init cycle
+ filerCat.output = cmdFilerCat.Flag.String("o", "", "write to file instead of stdout")
+}
+
+var cmdFilerCat = &Command{
+ UsageLine: "filer.cat [-o <file>] http://localhost:8888/path/to/file",
+ Short: "copy one file to local",
+ Long: `read one file to stdout or write to a file
+
+`,
+}
+
+func runFilerCat(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ if len(args) == 0 {
+ return false
+ }
+ filerSource := args[len(args)-1]
+
+ filerUrl, err := url.Parse(filerSource)
+ if err != nil {
+ fmt.Printf("The last argument should be a URL on filer: %v\n", err)
+ return false
+ }
+ urlPath := filerUrl.Path
+ if strings.HasSuffix(urlPath, "/") {
+ fmt.Printf("The last argument should be a file: %v\n", err)
+ return false
+ }
+
+ filerCat.filerAddress = filerUrl.Host
+ filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ dir, name := util.FullPath(urlPath).DirAndName()
+
+ writer := os.Stdout
+ if *filerCat.output != "" {
+
+ fmt.Printf("saving %s to %s\n", filerSource, *filerCat.output)
+
+ f, err := os.OpenFile(*filerCat.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
+ if err != nil {
+ fmt.Printf("open file %s: %v\n", *filerCat.output, err)
+ return false
+ }
+ defer f.Close()
+ writer = f
+ }
+
+ pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ return err
+ }
+
+ filerCat.filerClient = client
+
+ return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+
+ })
+
+ return true
+}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 93248f357..b95df696c 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -92,7 +92,7 @@ func runCopy(cmd *Command, args []string) bool {
}
urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
- fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
+ fmt.Printf("The last argument should be a folder and end with \"/\"\n")
return false
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index f5ab36d37..845bfaec1 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
"time"
@@ -38,7 +39,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
@@ -63,7 +64,7 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
return
}
-func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest {
return
}
@@ -84,7 +85,7 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
}
// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index c75a35f79..68f308a51 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"sort"
"sync"
@@ -52,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
}
-func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@@ -71,7 +72,7 @@ func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_
return
}
-func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
+func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
if aErr != nil {
@@ -116,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
}
-func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
+func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@@ -222,7 +223,7 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 6193dbd45..1f594a1bf 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -18,7 +18,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
- lookupFileId LookupFileIdFunctionType
+ lookupFileId wdclient.LookupFileIdFunctionType
readerLock sync.Mutex
fileSize int64
@@ -31,9 +31,7 @@ type ChunkReadAt struct {
var _ = io.ReaderAt(&ChunkReadAt{})
var _ = io.Closer(&ChunkReadAt{})
-type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
-
-func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
+func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index cffdc8303..f0042a0ff 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -13,16 +13,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
- chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
+ chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
- urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
+ urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
@@ -86,7 +86,7 @@ type ChunkStreamReader struct {
bufferOffset int64
bufferPos int
chunkIndex int
- lookupFileId LookupFileIdFunctionType
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index a58c8f296..9c0e4176f 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -3,6 +3,7 @@ package filersink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@@ -206,7 +207,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
})
}
-func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
+func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
if aErr != nil {
return nil, nil, aErr
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 773da0191..271baa132 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -15,6 +15,12 @@ const (
maxCursorIndex = 4096
)
+type HasLookupFileIdFunction interface {
+ GetLookupFileIdFunction() LookupFileIdFunctionType
+}
+
+type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
+
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
@@ -67,6 +73,10 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return
}
+func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return vc.LookupFileId
+}
+
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {