aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-08 11:03:14 -0700
committerchrislu <chris.lu@gmail.com>2024-04-08 11:03:14 -0700
commitcc1c69f312a967dfb636a677db910eb64ab65a06 (patch)
tree086fa2b7ce6e7b80c4a00516cba2810655579709 /weed/filer
parentccdd9cd8decf66089ac201b7c2ca1f5889582b93 (diff)
parentf08f95ac800b788e42290e58eb6444e094acf97f (diff)
downloadseaweedfs-cc1c69f312a967dfb636a677db910eb64ab65a06.tar.xz
seaweedfs-cc1c69f312a967dfb636a677db910eb64ab65a06.zip
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_manifest.go6
-rw-r--r--weed/filer/filerstore_wrapper.go2
-rw-r--r--weed/filer/stream.go17
3 files changed, 16 insertions, 9 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 60a5c538b..7ea2f0353 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -107,7 +107,7 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return err
}
- err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
+ err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
if err != nil {
return err
}
@@ -123,7 +123,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
-func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
+func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
var shouldRetry bool
var totalWritten int
@@ -132,7 +132,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe
for _, urlString := range urlStrings {
var localProcessed int
var writeErr error
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped {
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index cc3e58363..986dadb77 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -303,7 +303,7 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
}
}
}
- if count < limit && lastFileName <= prefix {
+ if count < limit && lastFileName < prefix {
notPrefixed = notPrefixed[:0]
lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
notPrefixed = append(notPrefixed, entry)
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 2686fd833..23a853b9a 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -69,11 +69,17 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
type DoStreamContent func(writer io.Writer) error
-func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
- return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0)
+func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
+ return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0)
}
-func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
+type VolumeServerJwtFunction func(fileId string) string
+
+func noJwtFunc(string) string {
+ return ""
+}
+
+func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
@@ -119,7 +125,8 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
}
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
- err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
+ jwt := jwtFunc(chunkView.FileId)
+ err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
offset += int64(chunkView.ViewSize)
remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
@@ -143,7 +150,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
}
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size)
+ streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size)
if err != nil {
return err
}