diff options
| author | chrislu <chris.lu@gmail.com> | 2024-04-08 11:03:14 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-04-08 11:03:14 -0700 |
| commit | cc1c69f312a967dfb636a677db910eb64ab65a06 (patch) | |
| tree | 086fa2b7ce6e7b80c4a00516cba2810655579709 /weed | |
| parent | ccdd9cd8decf66089ac201b7c2ca1f5889582b93 (diff) | |
| parent | f08f95ac800b788e42290e58eb6444e094acf97f (diff) | |
| download | seaweedfs-cc1c69f312a967dfb636a677db910eb64ab65a06.tar.xz seaweedfs-cc1c69f312a967dfb636a677db910eb64ab65a06.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
28 files changed, 322 insertions, 201 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 60a5c538b..7ea2f0353 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -107,7 +107,7 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return err } - err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0) + err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -123,7 +123,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { +func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { var shouldRetry bool var totalWritten int @@ -132,7 +132,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKe for _, urlString := range urlStrings { var localProcessed int var writeErr error - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index cc3e58363..986dadb77 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -303,7 +303,7 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u } } } - if count < limit && lastFileName <= prefix { + if count < limit && lastFileName < prefix { notPrefixed = notPrefixed[:0] lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { notPrefixed = append(notPrefixed, entry) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 2686fd833..23a853b9a 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -69,11 +69,17 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R type DoStreamContent func(writer io.Writer) error -func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { - return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0) +func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { + return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0) } -func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { +type VolumeServerJwtFunction func(fileId string) string + +func noJwtFunc(string) string { + return "" +} + +func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) @@ -119,7 +125,8 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + jwt := jwtFunc(chunkView.FileId) + err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) offset += int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) @@ -143,7 +150,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc } func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size) + streamFn, err := PrepareStreamContent(masterClient, noJwtFunc, chunks, offset, size) if err != nil { return err } diff --git a/weed/ftpd/ftp_server.go b/weed/ftpd/ftp_server.go index d51f3624d..7334fa3c7 100644 --- a/weed/ftpd/ftp_server.go +++ b/weed/ftpd/ftp_server.go @@ -29,7 +29,7 @@ type SftpServer struct { var _ = ftpserver.MainDriver(&SftpServer{}) -// NewServer returns a new FTP server driver +// NewFtpServer returns a new FTP server driver func NewFtpServer(ftpListener net.Listener, option *FtpServerOption) (*SftpServer, error) { var err error server := &SftpServer{ diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index b6b870c3c..1b7a0146d 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "google.golang.org/grpc" "sync" @@ -193,6 +194,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest }) if lastError != nil { + stats.FilerHandlerCounter.WithLabelValues(stats.ErrorChunkAssign).Inc() continue } @@ -262,6 +264,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a WritableVolumeCount: so.VolumeGrowthCount, } if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" { + ar.WritableVolumeCount = uint32(count) altRequest = &VolumeAssignRequest{ Count: uint64(count), Replication: so.Replication, diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index 04548cc6f..0a156cfce 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -311,7 +311,7 @@ func parseSignature(signElement string) (string, s3err.ErrorCode) { return signature, s3err.ErrNone } -// doesPolicySignatureMatch - Verify query headers with post policy +// doesPolicySignatureV4Match - Verify query headers with post policy // - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html // // returns ErrNone if the signature matches. diff --git a/weed/s3api/auto_signature_v4_test.go b/weed/s3api/auto_signature_v4_test.go index ccee8b885..6ff67b5bf 100644 --- a/weed/s3api/auto_signature_v4_test.go +++ b/weed/s3api/auto_signature_v4_test.go @@ -262,7 +262,7 @@ func getMD5HashBase64(data []byte) string { return base64.StdEncoding.EncodeToString(getMD5Sum(data)) } -// getSHA256Hash returns SHA-256 sum of given data. +// getSHA256Sum returns SHA-256 sum of given data. func getSHA256Sum(data []byte) []byte { hash := sha256.New() hash.Write(data) diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 765a5679e..e9cd6a0c4 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -1,11 +1,12 @@ package s3api import ( + "cmp" "encoding/hex" "encoding/xml" "fmt" - "github.com/google/uuid" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/stats" "golang.org/x/exp/slices" "math" "path/filepath" @@ -16,12 +17,19 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" + "github.com/google/uuid" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) +const ( + multipartExt = ".part" + multiPartMinSize = 5 * 1024 * 1024 +) + type InitiateMultipartUploadResult struct { XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"` s3.CreateMultipartUploadOutput @@ -70,61 +78,129 @@ type CompleteMultipartUploadResult struct { func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { glog.V(2).Infof("completeMultipartUpload input %v", input) - - completedParts := parts.Parts - slices.SortFunc(completedParts, func(a, b CompletedPart) int { - return a.PartNumber - b.PartNumber - }) + if len(parts.Parts) == 0 { + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() + return nil, s3err.ErrNoSuchUpload + } + completedPartNumbers := []int{} + completedPartMap := make(map[int][]string) + for _, part := range parts.Parts { + if _, ok := completedPartMap[part.PartNumber]; !ok { + completedPartNumbers = append(completedPartNumbers, part.PartNumber) + } + completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag) + } + sort.Ints(completedPartNumbers) uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId - entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList) - if err != nil || len(entries) == 0 { + if err != nil { glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() + return nil, s3err.ErrNoSuchUpload + } + + if len(entries) == 0 { + entryName, dirName := s3a.getEntryNameAndDir(input) + if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { + if uploadId, ok := entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id]; ok && *input.UploadId == string(uploadId) { + return &CompleteMultipartUploadResult{ + CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{ + Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Bucket: input.Bucket, + ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), + Key: objectKey(input.Key), + }, + }, s3err.ErrNone + } + } + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() return nil, s3err.ErrNoSuchUpload } pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId) if err != nil { glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc() return nil, s3err.ErrNoSuchUpload } - - // check whether completedParts is more than received parts - { - partNumbers := make(map[int]struct{}, len(entries)) - for _, entry := range entries { - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - partNumberString := entry.Name[:len(entry.Name)-len(".part")] - partNumber, err := strconv.Atoi(partNumberString) - if err == nil { - partNumbers[partNumber] = struct{}{} + deleteEntries := []*filer_pb.Entry{} + partEntries := make(map[int][]*filer_pb.Entry, len(entries)) + entityTooSmall := false + for _, entry := range entries { + foundEntry := false + glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name) + if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) { + continue + } + partNumber, err := parsePartNumber(entry.Name) + if err != nil { + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc() + glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err) + continue + } + completedPartsByNumber, ok := completedPartMap[partNumber] + if !ok { + continue + } + for _, partETag := range completedPartsByNumber { + partETag = strings.Trim(partETag, `"`) + entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) + if partETag != "" && len(partETag) == 32 && entryETag != "" { + if entryETag != partETag { + glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc() + continue } + } else { + glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc() + } + if len(entry.Chunks) == 0 { + glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc() + continue } + //there maybe multi same part, because of client retry + partEntries[partNumber] = append(partEntries[partNumber], entry) + foundEntry = true } - for _, part := range completedParts { - if _, found := partNumbers[part.PartNumber]; !found { - return nil, s3err.ErrInvalidPart + if foundEntry { + if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] && + entry.Attributes.FileSize < multiPartMinSize { + glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name) + entityTooSmall = true } + } else { + deleteEntries = append(deleteEntries, entry) } } - + if entityTooSmall { + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc() + return nil, s3err.ErrEntityTooSmall + } mime := pentry.Attributes.Mime - var finalParts []*filer_pb.FileChunk var offset int64 - - for _, entry := range entries { - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - partETag, found := findByPartNumber(entry.Name, completedParts) - if !found { + for _, partNumber := range completedPartNumbers { + partEntriesByNumber, ok := partEntries[partNumber] + if !ok { + glog.Errorf("part %d has no entry", partNumber) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc() + return nil, s3err.ErrInvalidPart + } + found := false + if len(partEntriesByNumber) > 1 { + slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int { + return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs) + }) + } + for _, entry := range partEntriesByNumber { + if found { + deleteEntries = append(deleteEntries, entry) + stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc() continue } - entryETag := hex.EncodeToString(entry.Attributes.GetMd5()) - if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag { - glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag) - return nil, s3err.ErrInvalidPart - } for _, chunk := range entry.GetChunks() { p := &filer_pb.FileChunk{ FileId: chunk.GetFileIdString(), @@ -137,28 +213,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa finalParts = append(finalParts, p) offset += int64(chunk.Size) } + found = true } } - entryName := filepath.Base(*input.Key) - dirName := filepath.ToSlash(filepath.Dir(*input.Key)) - if dirName == "." { - dirName = "" - } - if strings.HasPrefix(dirName, "/") { - dirName = dirName[1:] - } - dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) - - // remove suffix '/' - if strings.HasSuffix(dirName, "/") { - dirName = dirName[:len(dirName)-1] - } - + entryName, dirName := s3a.getEntryNameAndDir(input) err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } + entry.Extended[s3_constants.X_SeaweedFS_Header_Upload_Id] = []byte(*input.UploadId) for k, v := range pentry.Extended { if k != "key" { entry.Extended[k] = v @@ -186,6 +250,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa }, } + for _, deleteEntry := range deleteEntries { + //delete unused part data + glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name) + if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil { + glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err) + } + } if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil { glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err) } @@ -193,29 +264,33 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return } -func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) { - partNumber, formatErr := strconv.Atoi(fileName[:4]) - if formatErr != nil { - return +func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) { + entryName := filepath.Base(*input.Key) + dirName := filepath.ToSlash(filepath.Dir(*input.Key)) + if dirName == "." { + dirName = "" } - x := sort.Search(len(parts), func(i int) bool { - return parts[i].PartNumber >= partNumber - }) - if x >= len(parts) { - return + if strings.HasPrefix(dirName, "/") { + dirName = dirName[1:] } - if parts[x].PartNumber != partNumber { - return + dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) + + // remove suffix '/' + if strings.HasSuffix(dirName, "/") { + dirName = dirName[:len(dirName)-1] } - y := 0 - for i, part := range parts[x:] { - if part.PartNumber == partNumber { - y = i - } else { - break - } + return entryName, dirName +} + +func parsePartNumber(fileName string) (int, error) { + var partNumberString string + index := strings.Index(fileName, "_") + if index != -1 { + partNumberString = fileName[:index] + } else { + partNumberString = fileName[:len(fileName)-len(multipartExt)] } - return parts[x+y].ETag, true + return strconv.Atoi(partNumberString) } func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) { @@ -331,7 +406,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP StorageClass: aws.String("STANDARD"), } - entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts)) + entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts)) if err != nil { glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err) return nil, s3err.ErrNoSuchUpload @@ -343,9 +418,8 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP output.IsTruncated = aws.Bool(!isLast) for _, entry := range entries { - if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory { - partNumberString := entry.Name[:len(entry.Name)-len(".part")] - partNumber, err := strconv.Atoi(partNumberString) + if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory { + partNumber, err := parsePartNumber(entry.Name) if err != nil { glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err) continue diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go index e76d903b8..7f75a40de 100644 --- a/weed/s3api/filer_multipart_test.go +++ b/weed/s3api/filer_multipart_test.go @@ -50,88 +50,27 @@ func TestListPartsResult(t *testing.T) { } -func Test_findByPartNumber(t *testing.T) { - type args struct { - fileName string - parts []CompletedPart - } - - parts := []CompletedPart{ - { - ETag: "xxx", - PartNumber: 1, - }, - { - ETag: "lll", - PartNumber: 1, - }, - { - ETag: "yyy", - PartNumber: 3, - }, - { - ETag: "zzz", - PartNumber: 5, - }, - } - +func Test_parsePartNumber(t *testing.T) { tests := []struct { - name string - args args - wantEtag string - wantFound bool + name string + fileName string + partNum int }{ { "first", - args{ - "0001.part", - parts, - }, - "lll", - true, + "0001_uuid.part", + 1, }, { "second", - args{ - "0002.part", - parts, - }, - "", - false, - }, - { - "third", - args{ - "0003.part", - parts, - }, - "yyy", - true, - }, - { - "fourth", - args{ - "0004.part", - parts, - }, - "", - false, - }, - { - "fifth", - args{ - "0005.part", - parts, - }, - "zzz", - true, + "0002.part", + 2, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts) - assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts) - assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts) + partNumber, _ := parsePartNumber(tt.fileName) + assert.Equalf(t, tt.partNum, partNumber, "parsePartNumber(%v)", tt.fileName) }) } } diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 30a878ccb..8e4a2f8c7 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -39,6 +39,7 @@ const ( AmzTagCount = "x-amz-tagging-count" X_SeaweedFS_Header_Directory_Key = "x-seaweedfs-is-directory-key" + X_SeaweedFS_Header_Upload_Id = "X-Seaweedfs-Upload-Id" // S3 ACL headers AmzCannedAcl = "X-Amz-Acl" diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index bb3939571..04e1e00a4 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -351,7 +351,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr } -// DeleteBucketMetricsConfiguration Delete Bucket Lifecycle +// DeleteBucketLifecycleHandler Delete Bucket Lifecycle // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go index 8dc33f213..8d13fe17e 100644 --- a/weed/s3api/s3api_object_copy_handlers.go +++ b/weed/s3api/s3api_object_copy_handlers.go @@ -2,16 +2,17 @@ package s3api import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - "modernc.org/strutil" "net/http" "net/url" "strconv" "strings" "time" + "modernc.org/strutil" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -170,8 +171,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req rangeHeader := r.Header.Get("x-amz-copy-source-range") - dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID) + dstUrl := s3a.genPartUploadUrl(dstBucket, uploadID, partID) srcUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject)) diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 187022079..6fecdcf2d 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" + "github.com/google/uuid" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -247,8 +248,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID) - uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID) + uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID) if partID == 1 && r.Header.Get("Content-Type") == "" { dataReader = mimeDetect(r, dataReader) @@ -271,6 +271,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder) } +func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string { + return fmt.Sprintf("http://%s%s/%s/%04d_%s.part", + s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString()) +} + // Generate uploadID hash string from object func (s3a *S3ApiServer) generateUploadID(object string) string { if strings.HasPrefix(object, "/") { diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index f332da856..b00e4630d 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -374,7 +374,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } if cursor.maxKeys <= 0 { cursor.isTruncated = true - return + continue } entry := resp.Entry nextMarker = entry.Name diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 356761f30..9880afee0 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -91,6 +91,7 @@ type FilerServer struct { secret security.SigningKey filer *filer.Filer filerGuard *security.Guard + volumeGuard *security.Guard grpcDialOption grpc.DialOption // metrics read from the master @@ -113,6 +114,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + volumeSigningKey := v.GetString("jwt.signing.key") + v.SetDefault("jwt.signing.expires_after_seconds", 10) + volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") + + volumeReadSigningKey := v.GetString("jwt.signing.read.key") + v.SetDefault("jwt.signing.read.expires_after_seconds", 60) + volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") + v.SetDefault("cors.allowed_origins.values", "*") allowedOrigins := v.GetString("cors.allowed_origins.values") @@ -145,6 +154,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Cipher = option.Cipher // we do not support IP whitelist right now fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) + fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec) fs.checkWithMaster() diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index db46f00b3..e04994569 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -2,6 +2,7 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" @@ -20,6 +21,26 @@ func init() { }} } +func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { + encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite) + + if encodedJwt == "" { + return + } + + r.Header.Set("Authorization", "BEARER "+string(encodedJwt)) +} + +func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrite bool) string { + var encodedJwt security.EncodedJwt + if isWrite { + encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.SigningKey, fs.volumeGuard.ExpiresAfterSec, fileId) + } else { + encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId) + } + return string(encodedJwt) +} + func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) { urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index c139060e4..83411051d 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -15,6 +15,7 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -261,7 +262,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) @@ -277,3 +278,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) }, nil }) } + +func (fs *FilerServer) maybeGetVolumeReadJwtAuthorizationToken(fileId string) string { + return string(security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId)) +} diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go index 2aede2d50..5839a6a73 100644 --- a/weed/server/master_grpc_server_assign.go +++ b/weed/server/master_grpc_server_assign.go @@ -97,6 +97,9 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest continue } dn := dnList.Head() + if dn == nil { + continue + } var replicas []*master_pb.Location for _, r := range dnList.Rest() { replicas = append(replicas, &master_pb.Location{ diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index c5e059f21..9dc6351a4 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -149,7 +149,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } else { ms.maybeAddJwtAuthorization(w, fid, true) dn := dnList.Head() - + if dn == nil { + continue + } writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) return } diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 9b6a64e6f..b724f16f9 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -315,8 +315,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) if !takeAction { - // adjust free volume count - dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- + // adjust volume count + dst.dataNode.DiskInfos[replica.info.DiskType].VolumeCount++ break } @@ -349,8 +349,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co return err } - // adjust free volume count - dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- + // adjust volume count + dst.dataNode.DiskInfos[replica.info.DiskType].VolumeCount++ break } } diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index f61f68e08..83391f047 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -241,7 +241,13 @@ var ( Name: "request_total", Help: "Counter of s3 requests.", }, []string{"type", "code", "bucket"}) - + S3HandlerCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: "s3", + Name: "handler_total", + Help: "Counter of s3 server handlers.", + }, []string{"type"}) S3RequestHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: Namespace, @@ -292,6 +298,7 @@ func init() { Gather.MustRegister(VolumeServerResourceGauge) Gather.MustRegister(S3RequestCounter) + Gather.MustRegister(S3HandlerCounter) Gather.MustRegister(S3RequestHistogram) Gather.MustRegister(S3TimeToFirstByteHistogram) } diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go index f97796e3b..c0a6e99be 100644 --- a/weed/stats/metrics_names.go +++ b/weed/stats/metrics_names.go @@ -40,6 +40,17 @@ const ( ErrorReadInternal = "read.internal.error" ErrorWriteEntry = "write.entry.failed" RepeatErrorUploadContent = "upload.content.repeat.failed" + ErrorChunkAssign = "chunkAssign.failed" ErrorReadCache = "read.cache.failed" ErrorReadStream = "read.stream.failed" + + // s3 handler + ErrorCompletedNoSuchUpload = "errorCompletedNoSuchUpload" + ErrorCompleteEntityTooSmall = "errorCompleteEntityTooSmall" + ErrorCompletedPartEmpty = "errorCompletedPartEmpty" + ErrorCompletedPartNumber = "errorCompletedPartNumber" + ErrorCompletedPartNotFound = "errorCompletedPartNotFound" + ErrorCompletedEtagInvalid = "errorCompletedEtagInvalid" + ErrorCompletedEtagMismatch = "errorCompletedEtagMismatch" + ErrorCompletedPartEntryMismatch = "errorCompletedPartEntryMismatch" ) diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 050ded613..68ffbd7e7 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -81,13 +81,20 @@ func (df *DiskFile) Close() error { if df.File == nil { return nil } - if err := df.Sync(); err != nil { - return err + err := df.Sync() + var err1 error + if df.File != nil { + // always try to close + err1 = df.File.Close() } - if err := df.File.Close(); err != nil { + // assume closed + df.File = nil + if err != nil { return err } - df.File = nil + if err1 != nil { + return err1 + } return nil } diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go index a3473c677..04c5e8aeb 100644 --- a/weed/topology/volume_growth_test.go +++ b/weed/topology/volume_growth_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" "testing" "github.com/seaweedfs/seaweedfs/weed/sequence" @@ -419,11 +420,13 @@ func TestPickForWrite(t *testing.T) { Rack: "", DataNode: "", } + v := util.GetViper() + v.Set("master.volume_growth.threshold", 0.9) for _, rpStr := range []string{"001", "010", "100"} { rp, _ := super_block.NewReplicaPlacementFromString(rpStr) vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType) volumeGrowOption.ReplicaPlacement = rp - for _, dc := range []string{"", "dc1", "dc2", "dc3"} { + for _, dc := range []string{"", "dc1", "dc2", "dc3", "dc0"} { volumeGrowOption.DataCenter = dc for _, r := range []string{""} { volumeGrowOption.Rack = r @@ -432,8 +435,13 @@ func TestPickForWrite(t *testing.T) { continue } volumeGrowOption.DataNode = dn - fileId, count, _, _, err := topo.PickForWrite(1, volumeGrowOption, vl) - if err != nil { + fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl) + if dc == "dc0" { + if err == nil || count != 0 || !shouldGrow { + fmt.Println(dc, r, dn, "pick for write should be with error") + t.Fail() + } + } else if err != nil { fmt.Println(dc, r, dn, "pick for write error :", err) t.Fail() } else if count == 0 { @@ -442,6 +450,9 @@ func TestPickForWrite(t *testing.T) { } else if len(fileId) == 0 { fmt.Println(dc, r, dn, "pick for write file id is empty") t.Fail() + } else if shouldGrow { + fmt.Println(dc, r, dn, "pick for write error : not should grow") + t.Fail() } } } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 278978292..d04552d35 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -234,13 +234,18 @@ func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) { } func (vl *VolumeLayout) isAllWritable(vid needle.VolumeId) bool { - for _, dn := range vl.vid2location[vid].list { - if v, getError := dn.GetVolumesById(vid); getError == nil { - if v.ReadOnly { - return false + if location, ok := vl.vid2location[vid]; ok { + for _, dn := range location.list { + if v, getError := dn.GetVolumesById(vid); getError == nil { + if v.ReadOnly { + return false + } } } + } else { + return false } + return true } @@ -301,7 +306,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() { shouldGrow = true } - return vid, count, locationList, shouldGrow, nil + return vid, count, locationList.Copy(), shouldGrow, nil } return 0, 0, nil, shouldGrow, errors.New("Strangely vid " + vid.String() + " is on no machine!") } @@ -336,7 +341,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vi return } } - return vid, count, locationList, shouldGrow, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode) + return vid, count, locationList, true, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode) } func (vl *VolumeLayout) HasGrowRequest() bool { diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index c26f77104..127ad67eb 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -28,6 +28,9 @@ func (dnll *VolumeLocationList) Copy() *VolumeLocationList { func (dnll *VolumeLocationList) Head() *DataNode { //mark first node as master volume + if dnll.Length() == 0 { + return nil + } return dnll.list[0] } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index d1505f673..7b3ac4bc4 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -53,11 +53,15 @@ func Post(url string, values url.Values) ([]byte, error) { // github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // may need increasing http.Client.Timeout func Get(url string) ([]byte, bool, error) { + return GetAuthenticated(url, "") +} +func GetAuthenticated(url, jwt string) ([]byte, bool, error) { request, err := http.NewRequest("GET", url, nil) if err != nil { return nil, true, err } + maybeAddAuth(request, jwt) request.Header.Add("Accept-Encoding", "gzip") response, err := client.Do(request) @@ -101,11 +105,15 @@ func Head(url string) (http.Header, error) { return r.Header, nil } -func Delete(url string, jwt string) error { - req, err := http.NewRequest("DELETE", url, nil) +func maybeAddAuth(req *http.Request, jwt string) { if jwt != "" { req.Header.Set("Authorization", "BEARER "+string(jwt)) } +} + +func Delete(url string, jwt string) error { + req, err := http.NewRequest("DELETE", url, nil) + maybeAddAuth(req, jwt) if err != nil { return err } @@ -133,9 +141,7 @@ func Delete(url string, jwt string) error { func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) { req, err := http.NewRequest("DELETE", url, nil) - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } + maybeAddAuth(req, jwt) if err != nil { return } @@ -193,9 +199,7 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head return "", nil, nil, err } - if len(jwt) > 0 { - req.Header.Set("Authorization", "BEARER "+jwt) - } + maybeAddAuth(req, jwt) response, err := client.Do(req) if err != nil { @@ -229,7 +233,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC if cipherKey != nil { var n int - _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { n = copy(buf, data) }) return int64(n), err @@ -298,11 +302,16 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC } func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) +} + +func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { - return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } req, err := http.NewRequest("GET", fileUrl, nil) + maybeAddAuth(req, jwt) if err != nil { return false, err } @@ -354,8 +363,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is } -func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { - encryptedData, retryable, err := Get(fileUrl) +func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { + encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt) if err != nil { return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) } @@ -392,9 +401,7 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt req.Header.Add("Accept-Encoding", "gzip") } - if len(jwt) > 0 { - req.Header.Set("Authorization", "BEARER "+jwt) - } + maybeAddAuth(req, jwt) r, err := client.Do(req) if err != nil { diff --git a/weed/util/skiplist/skiplist.go b/weed/util/skiplist/skiplist.go index 21eed4b43..befb0389c 100644 --- a/weed/util/skiplist/skiplist.go +++ b/weed/util/skiplist/skiplist.go @@ -26,7 +26,7 @@ type SkipList struct { // elementCount int } -// NewSeedEps returns a new empty, initialized Skiplist. +// NewSeed returns a new empty, initialized Skiplist. // Given a seed, a deterministic height/list behaviour can be achieved. // Eps is used to compare keys given by the ExtractKey() function on equality. func NewSeed(seed int64, listStore ListStore) *SkipList { |
