diff options
Diffstat (limited to 'weed/filer2/filer_client_util.go')
| -rw-r--r-- | weed/filer2/filer_client_util.go | 94 |
1 files changed, 50 insertions, 44 deletions
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go index 7e093eea2..1c1fa6a5b 100644 --- a/weed/filer2/filer_client_util.go +++ b/weed/filer2/filer_client_util.go @@ -3,6 +3,8 @@ package filer2 import ( "context" "fmt" + "io" + "math" "strings" "sync" @@ -20,10 +22,11 @@ func VolumeId(fileId string) string { } type FilerClient interface { - WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error + WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error + AdjustedUrl(hostAndPort string) string } -func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath string, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { +func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { var vids []string for _, chunkView := range chunkViews { vids = append(vids, VolumeId(chunkView.FileId)) @@ -31,10 +34,10 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s vid2Locations := make(map[string]*filer_pb.Locations) - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) if err != nil { @@ -65,20 +68,16 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s return } + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], - !chunkView.IsFullChunk) + n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)]) if err != nil { - glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) + glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err) err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) + volumeServerAddress, chunkView.FileId, err) return } @@ -91,68 +90,75 @@ func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath s return } -func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string) (entry *filer_pb.Entry, err error) { +func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) { - dir, name := FullPath(fullFilePath).DirAndName() + dir, name := fullFilePath.DirAndName() - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, } - glog.V(3).Infof("read %s request: %v", fullFilePath, request) - resp, err := client.LookupDirectoryEntry(ctx, request) + // glog.V(3).Infof("read %s request: %v", fullFilePath, request) + resp, err := filer_pb.LookupEntry(client, request) if err != nil { - if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) { + if err == filer_pb.ErrNotFound { return nil } - glog.V(3).Infof("read %s attr %v: %v", fullFilePath, request, err) + glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err) return err } - if resp.Entry != nil { - entry = resp.Entry + if resp.Entry == nil { + // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry) + return nil } + entry = resp.Entry return nil }) return } -func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) { +func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) { - err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - paginationLimit := 1024 + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { lastEntryName := "" - for { - - request := &filer_pb.ListEntriesRequest{ - Directory: fullDirPath, - StartFromFileName: lastEntryName, - Limit: uint32(paginationLimit), - } + request := &filer_pb.ListEntriesRequest{ + Directory: string(fullDirPath), + Prefix: prefix, + StartFromFileName: lastEntryName, + Limit: math.MaxUint32, + } - glog.V(3).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - return fmt.Errorf("list %s: %v", fullDirPath, err) - } + glog.V(3).Infof("read directory: %v", request) + stream, err := client.ListEntries(context.Background(), request) + if err != nil { + return fmt.Errorf("list %s: %v", fullDirPath, err) + } - for _, entry := range resp.Entries { - fn(entry) - lastEntryName = entry.Name + var prevEntry *filer_pb.Entry + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + if prevEntry != nil { + fn(prevEntry, true) + } + break + } else { + return recvErr + } } - - if len(resp.Entries) < paginationLimit { - break + if prevEntry != nil { + fn(prevEntry, false) } - + prevEntry = resp.Entry } return nil |
