aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filer_client_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filer_client_util.go')
-rw-r--r--weed/filer2/filer_client_util.go94
1 files changed, 50 insertions, 44 deletions
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
index 7e093eea2..1c1fa6a5b 100644
--- a/weed/filer2/filer_client_util.go
+++ b/weed/filer2/filer_client_util.go
@@ -3,6 +3,8 @@ package filer2
import (
"context"
"fmt"
+ "io"
+ "math"
"strings"
"sync"
@@ -20,10 +22,11 @@ func VolumeId(fileId string) string {
}
type FilerClient interface {
- WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error
+ WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
+ AdjustedUrl(hostAndPort string) string
}
-func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
+func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
var vids []string
for _, chunkView := range chunkViews {
vids = append(vids, VolumeId(chunkView.FileId))
@@ -31,10 +34,10 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s
vid2Locations := make(map[string]*filer_pb.Locations)
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
if err != nil {
@@ -65,20 +68,16 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s
return
}
+ volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
var n int64
- n, err = util.ReadUrl(
- fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
- chunkView.Offset,
- int(chunkView.Size),
- buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
- !chunkView.IsFullChunk)
+ n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)])
if err != nil {
- glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
+ glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
err = fmt.Errorf("failed to read http://%s/%s: %v",
- locations.Locations[0].Url, chunkView.FileId, err)
+ volumeServerAddress, chunkView.FileId, err)
return
}
@@ -91,68 +90,75 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s
return
}
-func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) {
+func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
- dir, name := FullPath(fullFilePath).DirAndName()
+ dir, name := fullFilePath.DirAndName()
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
}
- glog.V(3).Infof("read %s request: %v", fullFilePath, request)
- resp, err := client.LookupDirectoryEntry(ctx, request)
+ // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
+ resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
- if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
+ if err == filer_pb.ErrNotFound {
return nil
}
- glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err)
+ glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
return err
}
- if resp.Entry != nil {
- entry = resp.Entry
+ if resp.Entry == nil {
+ // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
+ return nil
}
+ entry = resp.Entry
return nil
})
return
}
-func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) {
+func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
- err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- paginationLimit := 1024
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
lastEntryName := ""
- for {
-
- request := &filer_pb.ListEntriesRequest{
- Directory: fullDirPath,
- StartFromFileName: lastEntryName,
- Limit: uint32(paginationLimit),
- }
+ request := &filer_pb.ListEntriesRequest{
+ Directory: string(fullDirPath),
+ Prefix: prefix,
+ StartFromFileName: lastEntryName,
+ Limit: math.MaxUint32,
+ }
- glog.V(3).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
+ glog.V(3).Infof("read directory: %v", request)
+ stream, err := client.ListEntries(context.Background(), request)
+ if err != nil {
+ return fmt.Errorf("list %s: %v", fullDirPath, err)
+ }
- for _, entry := range resp.Entries {
- fn(entry)
- lastEntryName = entry.Name
+ var prevEntry *filer_pb.Entry
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ if prevEntry != nil {
+ fn(prevEntry, true)
+ }
+ break
+ } else {
+ return recvErr
+ }
}
-
- if len(resp.Entries) < paginationLimit {
- break
+ if prevEntry != nil {
+ fn(prevEntry, false)
}
-
+ prevEntry = resp.Entry
}
return nil