diff options
| author | chrislu <chris.lu@gmail.com> | 2024-04-08 11:03:14 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-04-08 11:03:14 -0700 |
| commit | cc1c69f312a967dfb636a677db910eb64ab65a06 (patch) | |
| tree | 086fa2b7ce6e7b80c4a00516cba2810655579709 /weed/filer | |
| parent | ccdd9cd8decf66089ac201b7c2ca1f5889582b93 (diff) | |
| parent | f08f95ac800b788e42290e58eb6444e094acf97f (diff) | |
| download | seaweedfs-cc1c69f312a967dfb636a677db910eb64ab65a06.tar.xz seaweedfs-cc1c69f312a967dfb636a677db910eb64ab65a06.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 6 | ||||
| -rw-r--r-- | weed/filer/filerstore_wrapper.go | 2 | ||||
| -rw-r--r-- | weed/filer/stream.go | 17 |
3 files changed, 16 insertions, 9 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 60a5c538b..7ea2f0353 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -107,7 +107,7 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return err } - err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -123,7 +123,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { +func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { var shouldRetry bool var totalWritten int @@ -132,7 +132,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe for _, urlString := range urlStrings { var localProcessed int var writeErr error - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index cc3e58363..986dadb77 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -303,7 +303,7 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } } } - if count < limit && lastFileName <= prefix { + if count < limit && lastFileName < prefix { notPrefixed = notPrefixed[:0] lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { notPrefixed = append(notPrefixed, entry) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 2686fd833..23a853b9a 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -69,11 +69,17 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R type DoStreamContent func(writer io.Writer) error -func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { - return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0) +func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { + return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0) } -func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { +type VolumeServerJwtFunction func(fileId string) string + +func noJwtFunc(string) string { + return "" +} + +func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) @@ -119,7 +125,8 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + jwt := jwtFunc(chunkView.FileId) + err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) offset += int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) @@ -143,7 +150,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc } func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size) + streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size) if err != nil { return err } |
