aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_manifest.go5
-rw-r--r--weed/filer/stream.go30
2 files changed, 31 insertions, 4 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 80a741cf5..b04244669 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -109,7 +109,8 @@ func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileI
glog.ErrorfCtx(ctx, "operation LookupFileId %s failed, err: %v", fileId, err)
return err
}
- err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
+ jwt := JwtForVolumeServer(fileId)
+ err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, jwt, cipherKey, isGzipped, true, 0, 0)
if err != nil {
return err
}
@@ -150,7 +151,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin
retriedCnt++
var localProcessed int
var writeErr error
- shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
// Check for context cancellation during data processing
select {
case <-ctx.Done():
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 87280d6b0..b2ee00555 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
@@ -26,6 +27,30 @@ var getLookupFileIdBackoffSchedule = []time.Duration{
1800 * time.Millisecond,
}
+var (
+ jwtSigningReadKey security.SigningKey
+ jwtSigningReadKeyExpires int
+ loadJwtConfigOnce sync.Once
+)
+
+func loadJwtConfig() {
+ v := util.GetViper()
+ jwtSigningReadKey = security.SigningKey(v.GetString("jwt.signing.read.key"))
+ jwtSigningReadKeyExpires = v.GetInt("jwt.signing.read.expires_after_seconds")
+ if jwtSigningReadKeyExpires == 0 {
+ jwtSigningReadKeyExpires = 60
+ }
+}
+
+// JwtForVolumeServer generates a JWT token for volume server read operations if jwt.signing.read is configured
+func JwtForVolumeServer(fileId string) string {
+ loadJwtConfigOnce.Do(loadJwtConfig)
+ if len(jwtSigningReadKey) == 0 {
+ return ""
+ }
+ return string(security.GenJwtForVolumeServer(jwtSigningReadKey, jwtSigningReadKeyExpires, fileId))
+}
+
func HasData(entry *filer_pb.Entry) bool {
if len(entry.Content) > 0 {
@@ -152,7 +177,7 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
}
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size)
+ streamFn, err := PrepareStreamContent(masterClient, JwtForVolumeServer, chunks, offset, size)
if err != nil {
return err
}
@@ -351,8 +376,9 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
}
var buffer bytes.Buffer
var shouldRetry bool
+ jwt := JwtForVolumeServer(chunkView.FileId)
for _, urlString := range urlStrings {
- shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {