aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_manifest.go5
-rw-r--r--weed/filer/filer_notify_append.go8
-rw-r--r--weed/filer/reader_cache.go4
-rw-r--r--weed/filer/stream.go5
4 files changed, 15 insertions, 7 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 7ea2f0353..e9ae1800c 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -120,7 +121,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err
}
- return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
+ return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
@@ -132,7 +133,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri
for _, urlString := range urlStrings {
var localProcessed int
var writeErr error
- shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped {
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index 66ce24871..3c9a3496c 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -77,7 +77,13 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
PairMap: nil,
Jwt: assignResult.Auth,
}
- uploadResult, err := operation.UploadData(data, uploadOption)
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+
+ uploadResult, err := uploader.UploadData(data, uploadOption)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 7be54b193..fea2bbc89 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -2,7 +2,6 @@ package filer
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
@@ -10,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type ReaderCache struct {
@@ -171,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() {
s.data = mem.Allocate(s.chunkSize)
- _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
if s.err != nil {
mem.Free(s.data)
s.data = nil
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 23a853b9a..fdb443b53 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var getLookupFileIdBackoffSchedule = []time.Duration{
@@ -194,7 +195,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
return err
}
- n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
+ n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@@ -350,7 +351,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {