aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go30
1 files changed, 28 insertions, 2 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 87280d6b0..b2ee00555 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
@@ -26,6 +27,30 @@ var getLookupFileIdBackoffSchedule = []time.Duration{
1800 * time.Millisecond,
}
+var (
+ jwtSigningReadKey security.SigningKey
+ jwtSigningReadKeyExpires int
+ loadJwtConfigOnce sync.Once
+)
+
+func loadJwtConfig() {
+ v := util.GetViper()
+ jwtSigningReadKey = security.SigningKey(v.GetString("jwt.signing.read.key"))
+ jwtSigningReadKeyExpires = v.GetInt("jwt.signing.read.expires_after_seconds")
+ if jwtSigningReadKeyExpires == 0 {
+ jwtSigningReadKeyExpires = 60
+ }
+}
+
+// JwtForVolumeServer generates a JWT token for volume server read operations if jwt.signing.read is configured
+func JwtForVolumeServer(fileId string) string {
+ loadJwtConfigOnce.Do(loadJwtConfig)
+ if len(jwtSigningReadKey) == 0 {
+ return ""
+ }
+ return string(security.GenJwtForVolumeServer(jwtSigningReadKey, jwtSigningReadKeyExpires, fileId))
+}
+
func HasData(entry *filer_pb.Entry) bool {
if len(entry.Content) > 0 {
@@ -152,7 +177,7 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
}
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size)
+ streamFn, err := PrepareStreamContent(masterClient, JwtForVolumeServer, chunks, offset, size)
if err != nil {
return err
}
@@ -351,8 +376,9 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
}
var buffer bytes.Buffer
var shouldRetry bool
+ jwt := JwtForVolumeServer(chunkView.FileId)
for _, urlString := range urlStrings {
- shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {