aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/change_superblock/change_superblock.go3
-rw-r--r--unmaintained/diff_volume_servers/diff_volume_servers.go4
-rw-r--r--unmaintained/fix_dat/fix_dat.go3
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go3
-rw-r--r--unmaintained/remove_duplicate_fids/remove_duplicate_fids.go2
-rw-r--r--unmaintained/repeated_vacuum/repeated_vacuum.go14
-rw-r--r--unmaintained/s3/presigned_put/presigned_put.go6
-rw-r--r--unmaintained/see_dat/see_dat.go2
-rw-r--r--unmaintained/see_idx/see_idx.go3
-rw-r--r--unmaintained/see_log_entry/see_log_entry.go2
-rw-r--r--unmaintained/see_meta/see_meta.go2
-rw-r--r--unmaintained/stream_read_volume/stream_read_volume.go2
-rw-r--r--unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go16
-rw-r--r--unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go15
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go2
-rw-r--r--weed/command/benchmark.go5
-rw-r--r--weed/command/download.go9
-rw-r--r--weed/command/filer_copy.go21
-rw-r--r--weed/command/scaffold/security.toml8
-rw-r--r--weed/command/update.go5
-rw-r--r--weed/filer/filechunk_manifest.go5
-rw-r--r--weed/filer/filer_notify_append.go8
-rw-r--r--weed/filer/reader_cache.go4
-rw-r--r--weed/filer/stream.go5
-rw-r--r--weed/mount/weedfs_write.go6
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go3
-rw-r--r--weed/mq/broker/broker_write.go8
-rw-r--r--weed/mq/client/cmd/weed_pub_kv/publisher_kv.go2
-rw-r--r--weed/mq/client/cmd/weed_pub_record/publisher_record.go2
-rw-r--r--weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go2
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go2
-rw-r--r--weed/operation/chunked_file.go5
-rw-r--r--weed/operation/needle_parse_test.go16
-rw-r--r--weed/operation/submit.go24
-rw-r--r--weed/operation/upload_content.go84
-rw-r--r--weed/replication/repl_util/replication_util.go4
-rw-r--r--weed/replication/sink/filersink/fetch_write.go11
-rw-r--r--weed/replication/source/filer_source.go5
-rw-r--r--weed/s3api/s3api_acl_helper.go4
-rw-r--r--weed/s3api/s3api_bucket_handlers.go4
-rw-r--r--weed/s3api/s3api_object_handlers.go4
-rw-r--r--weed/s3api/s3api_object_handlers_copy.go9
-rw-r--r--weed/s3api/s3api_server.go11
-rw-r--r--weed/server/common.go7
-rw-r--r--weed/server/filer_server_handlers_proxy.go17
-rw-r--r--weed/server/filer_server_handlers_write.go3
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go8
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go8
-rw-r--r--weed/server/filer_server_handlers_write_upload.go8
-rw-r--r--weed/server/master_server.go3
-rw-r--r--weed/server/master_server_handlers_admin.go7
-rw-r--r--weed/server/volume_grpc_remote.go13
-rw-r--r--weed/server/volume_server_handlers_read.go11
-rw-r--r--weed/server/webdav_server.go7
-rw-r--r--weed/shell/command_fs_merge_volumes.go21
-rw-r--r--weed/shell/command_s3_clean_uploads.go3
-rw-r--r--weed/shell/command_volume_fsck.go5
-rw-r--r--weed/topology/store_replicate.go10
-rw-r--r--weed/util/http/client/http_client.go201
-rw-r--r--weed/util/http/client/http_client_interface.go16
-rw-r--r--weed/util/http/client/http_client_name.go14
-rw-r--r--weed/util/http/client/http_client_name_string.go23
-rw-r--r--weed/util/http/client/http_client_opt.go18
-rw-r--r--weed/util/http/http_global_client_init.go27
-rw-r--r--weed/util/http/http_global_client_util.go (renamed from weed/util/http_util.go)57
-rw-r--r--weed/weed.go2
66 files changed, 646 insertions, 198 deletions
diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go
index 27876272c..52368f8cd 100644
--- a/unmaintained/change_superblock/change_superblock.go
+++ b/unmaintained/change_superblock/change_superblock.go
@@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -40,6 +41,8 @@ that has those volumes.
*/
func main() {
flag.Parse()
+ util_http.NewGlobalHttpClient()
+
fileName := strconv.Itoa(*fixVolumeId)
if *fixVolumeCollection != "" {
fileName = *fixVolumeCollection + "_" + fileName
diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go
index 9433af147..e289fefe8 100644
--- a/unmaintained/diff_volume_servers/diff_volume_servers.go
+++ b/unmaintained/diff_volume_servers/diff_volume_servers.go
@@ -20,6 +20,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -40,7 +41,8 @@ var (
*/
func main() {
flag.Parse()
-
+ util_http.InitGlobalHttpClient()
+
util.LoadSecurityConfiguration()
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go
index 760fc79ca..164b5b238 100644
--- a/unmaintained/fix_dat/fix_dat.go
+++ b/unmaintained/fix_dat/fix_dat.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -36,6 +37,8 @@ The .idx has all correct offsets.
*/
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
+
fileName := strconv.Itoa(*fixVolumeId)
if *fixVolumeCollection != "" {
fileName = *fixVolumeCollection + "_" + fileName
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
index 2eeb5d6f9..2b63d5d59 100644
--- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -23,8 +24,8 @@ var (
)
func main() {
-
flag.Parse()
+ util_http.InitGlobalHttpClient()
if *isWrite {
startGenerateMetadata()
diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
index 354707c81..cfac97432 100644
--- a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
+++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
@@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -71,6 +72,7 @@ func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset in
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
vid := needle.VolumeId(*volumeId)
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 65ec94627..1f89bd902 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -25,6 +26,7 @@ var (
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
@@ -34,7 +36,7 @@ func main() {
go func() {
for {
println("vacuum threshold", *garbageThreshold)
- _, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", pb.ServerAddress(*master).ToHttpAddress(), *garbageThreshold))
+ _, _, err := util_http.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", pb.ServerAddress(*master).ToHttpAddress(), *garbageThreshold))
if err != nil {
log.Fatalf("vacuum: %v", err)
}
@@ -47,7 +49,7 @@ func main() {
assignResult, targetUrl := genFile(grpcDialOption, i)
- util.Delete(targetUrl, string(assignResult.Auth))
+ util_http.Delete(targetUrl, string(assignResult.Auth))
}
@@ -76,7 +78,13 @@ func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, st
PairMap: nil,
Jwt: assignResult.Auth,
}
- _, err = operation.UploadData(data, uploadOption)
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ log.Fatalf("upload: %v", err)
+ }
+
+ _, err = uploader.UploadData(data, uploadOption)
if err != nil {
log.Fatalf("upload: %v", err)
}
diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go
index ba135ff25..1e591dff2 100644
--- a/unmaintained/s3/presigned_put/presigned_put.go
+++ b/unmaintained/s3/presigned_put/presigned_put.go
@@ -7,10 +7,10 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
- "github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"strings"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// Downloads an item from an S3 Bucket in the region configured in the shared config
@@ -21,6 +21,8 @@ import (
// For this exampl to work, the domainName is needd
// weed s3 -domainName=localhost
func main() {
+ util_http.InitGlobalHttpClient()
+
h := md5.New()
content := strings.NewReader(stringContent)
content.WriteTo(h)
@@ -64,7 +66,7 @@ func main() {
fmt.Printf("error put request: %v\n", err)
return
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
fmt.Printf("response: %+v\n", resp)
}
diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go
index 1b2f0bb6a..a60e45760 100644
--- a/unmaintained/see_dat/see_dat.go
+++ b/unmaintained/see_dat/see_dat.go
@@ -10,6 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -40,6 +41,7 @@ func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset in
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
vid := needle.VolumeId(*volumeId)
diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go
index 856d96d54..87f00ebb0 100644
--- a/unmaintained/see_idx/see_idx.go
+++ b/unmaintained/see_idx/see_idx.go
@@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -27,6 +28,8 @@ This is to see content in .idx files.
*/
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
+
fileName := strconv.Itoa(*fixVolumeId)
if *fixVolumeCollection != "" {
fileName = *fixVolumeCollection + "_" + fileName
diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go
index d5deff283..42a63476b 100644
--- a/unmaintained/see_log_entry/see_log_entry.go
+++ b/unmaintained/see_log_entry/see_log_entry.go
@@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -20,6 +21,7 @@ var (
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
dst, err := os.OpenFile(*logdataFile, os.O_RDONLY, 0644)
if err != nil {
diff --git a/unmaintained/see_meta/see_meta.go b/unmaintained/see_meta/see_meta.go
index 6fc88358c..da78f0918 100644
--- a/unmaintained/see_meta/see_meta.go
+++ b/unmaintained/see_meta/see_meta.go
@@ -11,6 +11,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -19,6 +20,7 @@ var (
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
dst, err := os.OpenFile(*metaFile, os.O_RDONLY, 0644)
if err != nil {
diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go
index 2737962f2..cfdb36815 100644
--- a/unmaintained/stream_read_volume/stream_read_volume.go
+++ b/unmaintained/stream_read_volume/stream_read_volume.go
@@ -13,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -23,6 +24,7 @@ var (
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
util.LoadSecurityConfiguration()
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
index 7700a6dce..6dc703dbc 100644
--- a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
+++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
@@ -13,6 +13,7 @@ import (
"strings"
"sync"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -30,8 +31,8 @@ type stat struct {
}
func main() {
-
flag.Parse()
+ util_http.InitGlobalHttpClient()
data := make([]byte, *size)
println("data len", len(data))
@@ -43,16 +44,12 @@ func main() {
go func(x int) {
defer wg.Done()
- client := &http.Client{Transport: &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x)))
for t := 0; t < *times; t++ {
for f := 0; f < *fileCount; f++ {
fn := r.Intn(*fileCount)
- if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil {
+ if size, err := uploadFileToFiler(data, fmt.Sprintf("file%04d", fn), *destination); err == nil {
statsChan <- stat{
size: size,
}
@@ -93,7 +90,7 @@ func main() {
}
-func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) {
+func uploadFileToFiler(data []byte, filename, destination string) (size int64, err error) {
if !strings.HasSuffix(destination, "/") {
destination = destination + "/"
@@ -116,10 +113,13 @@ func uploadFileToFiler(client *http.Client, data []byte, filename, destination s
uri := destination + filename
request, err := http.NewRequest(http.MethodPost, uri, body)
+ if err != nil {
+ return 0, fmt.Errorf("http POST %s: %v", uri, err)
+ }
request.Header.Set("Content-Type", writer.FormDataContentType())
// request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also.
- resp, err := client.Do(request)
+ resp, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
return 0, fmt.Errorf("http POST %s: %v", uri, err)
} else {
diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
index c8d36053b..1cdcad0b3 100644
--- a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
+++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
@@ -14,6 +14,7 @@ import (
"strings"
"sync"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -30,8 +31,8 @@ type stat struct {
}
func main() {
-
flag.Parse()
+ util_http.InitGlobalHttpClient()
var fileNames []string
@@ -51,8 +52,6 @@ func main() {
for x := 0; x < *concurrency; x++ {
wg.Add(1)
- client := &http.Client{}
-
go func() {
defer wg.Done()
rand.Shuffle(len(fileNames), func(i, j int) {
@@ -60,7 +59,7 @@ func main() {
})
for t := 0; t < *times; t++ {
for _, filename := range fileNames {
- if size, err := uploadFileToFiler(client, filename, *destination); err == nil {
+ if size, err := uploadFileToFiler(filename, *destination); err == nil {
statsChan <- stat{
size: size,
}
@@ -99,7 +98,7 @@ func main() {
}
-func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) {
+func uploadFileToFiler(filename, destination string) (size int64, err error) {
file, err := os.Open(filename)
if err != nil {
panic(err)
@@ -131,9 +130,13 @@ func uploadFileToFiler(client *http.Client, filename, destination string) (size
uri := destination + file.Name()
request, err := http.NewRequest(http.MethodPost, uri, body)
+ if err != nil {
+ return 0, fmt.Errorf("http POST %s: %v", uri, err)
+ }
+
request.Header.Set("Content-Type", writer.FormDataContentType())
- resp, err := client.Do(request)
+ resp, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
return 0, fmt.Errorf("http POST %s: %v", uri, err)
} else {
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
index c210db81f..a75a095d4 100644
--- a/unmaintained/volume_tailer/volume_tailer.go
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
util2 "github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/tools/godoc/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -24,6 +25,7 @@ var (
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
util2.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util2.GetViper(), "grpc.client")
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 0cd9d31c5..bc7ee1292 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type BenchmarkOptions struct {
@@ -214,7 +215,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if isSecure {
jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
}
- if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
+ if e := util_http.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++
} else {
s.failed++
@@ -295,7 +296,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
- bytes, _, err = util.Get(url)
+ bytes, _, err = util_http.Get(url)
if err == nil {
break
}
diff --git a/weed/command/download.go b/weed/command/download.go
index 1032dcb62..1b7098824 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -63,11 +64,11 @@ func downloadToFile(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpti
if lookupError != nil {
return lookupError
}
- filename, _, rc, err := util.DownloadFile(fileUrl, jwt)
+ filename, _, rc, err := util_http.DownloadFile(fileUrl, jwt)
if err != nil {
return err
}
- defer util.CloseResponse(rc)
+ defer util_http.CloseResponse(rc)
if filename == "" {
filename = fileId
}
@@ -116,10 +117,10 @@ func fetchContent(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption
return "", nil, lookupError
}
var rc *http.Response
- if filename, _, rc, e = util.DownloadFile(fileUrl, jwt); e != nil {
+ if filename, _, rc, e = util_http.DownloadFile(fileUrl, jwt); e != nil {
return "", nil, e
}
- defer util.CloseResponse(rc)
+ defer util_http.CloseResponse(rc)
content, e = io.ReadAll(rc.Body)
return
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 59cd5491d..8f6cb233e 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -344,7 +344,12 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return err
}
- finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return uploaderErr
+ }
+
+ finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
@@ -423,7 +428,13 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
<-concurrentChunks
}()
- fileId, uploadResult, err, _ := operation.UploadWithRetry(
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err)
+ return
+ }
+
+ fileId, uploadResult, err, _ := uploader.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
@@ -535,8 +546,12 @@ func detectMimeType(f *os.File) string {
}
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return nil, fmt.Errorf("upload data: %v", uploaderErr)
+ }
- finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ finalFileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
worker,
&filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml
index 687854264..113e5b016 100644
--- a/weed/command/scaffold/security.toml
+++ b/weed/command/scaffold/security.toml
@@ -94,10 +94,14 @@ allowed_commonNames = "" # comma-separated SSL certificate common names
[grpc.client]
cert = ""
key = ""
-# Note: work in progress!
-# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
+
+# https client for master|volume|filer|etc connection
+# It is necessary that the parameters [https.volume]|[https.master]|[https.filer] are set
[https.client]
enabled = true
+cert = ""
+key = ""
+ca = ""
# volume server https options
[https.volume]
diff --git a/weed/command/update.go b/weed/command/update.go
index 314a903f2..4f2b66b2e 100644
--- a/weed/command/update.go
+++ b/weed/command/update.go
@@ -21,6 +21,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
"golang.org/x/net/context/ctxhttp"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
//copied from https://github.com/restic/restic/tree/master/internal/selfupdate
@@ -198,7 +199,7 @@ func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (R
if err != nil {
return Release{}, err
}
- defer util.CloseResponse(res)
+ defer util_http.CloseResponse(res)
if res.StatusCode != http.StatusOK {
content := res.Header.Get("Content-Type")
@@ -258,7 +259,7 @@ func getGithubData(ctx context.Context, url string) ([]byte, error) {
if err != nil {
return nil, err
}
- defer util.CloseResponse(res)
+ defer util_http.CloseResponse(res)
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 7ea2f0353..e9ae1800c 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -120,7 +121,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err
}
- return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
+ return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
@@ -132,7 +133,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri
for _, urlString := range urlStrings {
var localProcessed int
var writeErr error
- shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped {
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index 66ce24871..3c9a3496c 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -77,7 +77,13 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
PairMap: nil,
Jwt: assignResult.Auth,
}
- uploadResult, err := operation.UploadData(data, uploadOption)
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+
+ uploadResult, err := uploader.UploadData(data, uploadOption)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 7be54b193..fea2bbc89 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -2,7 +2,6 @@ package filer
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
@@ -10,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type ReaderCache struct {
@@ -171,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() {
s.data = mem.Allocate(s.chunkSize)
- _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
if s.err != nil {
mem.Free(s.data)
s.data = nil
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 23a853b9a..fdb443b53 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var getLookupFileIdBackoffSchedule = []time.Duration{
@@ -194,7 +195,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
return err
}
- n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
+ n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@@ -350,7 +351,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/mount/weedfs_write.go b/weed/mount/weedfs_write.go
index 4c8470245..77ad01b89 100644
--- a/weed/mount/weedfs_write.go
+++ b/weed/mount/weedfs_write.go
@@ -14,8 +14,12 @@ import (
func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return
+ }
- fileId, uploadResult, err, data := operation.UploadWithRetry(
+ fileId, uploadResult, err, data := uploader.UploadWithRetry(
wfs,
&filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index cb55e2032..c8a08eb18 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -13,6 +13,7 @@ import (
"math"
"sync/atomic"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
@@ -131,7 +132,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
for _, urlString := range urlStrings {
// TODO optimization opportunity: reuse the buffer
var data []byte
- if data, _, err = util.Get(urlString); err == nil {
+ if data, _, err = util_http.Get(urlString); err == nil {
processed = true
if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
return
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go
index 896f0ee75..9999529fb 100644
--- a/weed/mq/broker/broker_write.go
+++ b/weed/mq/broker/broker_write.go
@@ -55,7 +55,13 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
reader := util.NewBytesReader(data)
- fileId, uploadResult, err, _ = operation.UploadWithRetry(
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return
+ }
+
+ fileId, uploadResult, err, _ = uploader.UploadWithRetry(
b,
&filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
index 096b355a1..3ab3cb251 100644
--- a/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
+++ b/weed/mq/client/cmd/weed_pub_kv/publisher_kv.go
@@ -9,6 +9,7 @@ import (
"strings"
"sync"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -45,6 +46,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
config := &pub_client.PublisherConfiguration{
Topic: topic.NewTopic(*namespace, *t),
diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
index a5fbd455e..f340dd1c8 100644
--- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go
+++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
@@ -11,6 +11,7 @@ import (
"strings"
"sync"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -86,6 +87,7 @@ func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
recordType := schema.RecordTypeBegin().
WithField("key", schema.TypeBytes).
diff --git a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
index adcdda04c..fa0e10579 100644
--- a/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
+++ b/weed/mq/client/cmd/weed_sub_kv/subscriber_kv.go
@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"strings"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -23,6 +24,7 @@ var (
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
subscriberConfig := &sub_client.SubscriberConfiguration{
ClientId: fmt.Sprintf("client-%d", *clientId),
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index 53eb4f15b..914021dbc 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -13,6 +13,7 @@ import (
"google.golang.org/protobuf/proto"
"strings"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -49,6 +50,7 @@ func FromSchemaRecordValue(recordValue *schema_pb.RecordValue) *MyRecord {
func main() {
flag.Parse()
+ util_http.InitGlobalHttpClient()
subscriberConfig := &sub_client.SubscriberConfiguration{
ClientId: fmt.Sprintf("client-%d", *clientId),
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 02faf9904..50313a670 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -103,11 +104,11 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
}
- resp, err := util.Do(req)
+ resp, err := util_http.Do(req)
if err != nil {
return written, err
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
switch resp.StatusCode {
case http.StatusRequestedRangeNotSatisfiable:
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
index 07b0153a9..b4bac5976 100644
--- a/weed/operation/needle_parse_test.go
+++ b/weed/operation/needle_parse_test.go
@@ -38,15 +38,11 @@ If the content is already compressed, need to know the content size.
*/
func TestCreateNeedleFromRequest(t *testing.T) {
- mc := &MockClient{}
- tmp := HttpClient
- HttpClient = mc
- defer func() {
- HttpClient = tmp
- }()
+ mockClient := &MockClient{}
+ uploader := newUploader(mockClient)
{
- mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) {
assert.Equal(t, nil, err, "upload: %v", err)
assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime))
assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
@@ -62,7 +58,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption)
+ uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent))
}
@@ -73,7 +69,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
}
{
- mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) {
assert.Equal(t, nil, err, "upload: %v", err)
assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
@@ -90,7 +86,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- Upload(bytes.NewReader(gzippedData), uploadOption)
+ uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
}
/*
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 57bd81b14..516478dbe 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -217,7 +217,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
PairMap: nil,
Jwt: jwt,
}
- ret, e, _ := Upload(fi.Reader, uploadOption)
+
+ uploader, e := NewUploader()
+ if e != nil {
+ return 0, e
+ }
+
+ ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
if e != nil {
return 0, e
}
@@ -239,7 +245,13 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
PairMap: nil,
Jwt: jwt,
}
- uploadResult, uploadError, _ := Upload(reader, uploadOption)
+
+ uploader, uploaderError := NewUploader()
+ if uploaderError != nil {
+ return 0, uploaderError
+ }
+
+ uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
if uploadError != nil {
return 0, uploadError
}
@@ -265,6 +277,12 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
PairMap: nil,
Jwt: jwt,
}
- _, e = UploadData(buf, uploadOption)
+
+ uploader, e := NewUploader()
+ if e != nil {
+ return e
+ }
+
+ _, e = uploader.UploadData(buf, uploadOption)
return e
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 6c6aec1b5..8b223e769 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -9,7 +9,7 @@ import (
"io"
"mime"
"mime/multipart"
- "net"
+ "sync"
"net/http"
"net/textproto"
"path/filepath"
@@ -21,6 +21,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
)
type UploadOption struct {
@@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN
}
}
+var (
+ fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
+ uploader *Uploader
+ uploaderErr error
+ once sync.Once
+)
+
// HTTPClient interface for testing
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
-var (
- HttpClient HTTPClient
-)
+// Uploader
+type Uploader struct {
+ httpClient HTTPClient
+}
-func init() {
- HttpClient = &http.Client{Transport: &http.Transport{
- DialContext: (&net.Dialer{
- Timeout: 10 * time.Second,
- KeepAlive: 10 * time.Second,
- }).DialContext,
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
+func NewUploader() (*Uploader, error) {
+ once.Do(func () {
+ // With Dial context
+ var httpClient *util_http_client.HTTPClient
+ httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext)
+ if uploaderErr != nil {
+ uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr)
+ }
+ if httpClient != nil {
+ uploader = newUploader(httpClient)
+ }
+ })
+ return uploader, uploaderErr
+}
+
+func newUploader(httpClient HTTPClient) (*Uploader) {
+ return &Uploader{
+ httpClient: httpClient,
+ }
}
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
-func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
doUploadFunc := func() error {
var host string
@@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
uploadOption.Jwt = auth
var uploadErr error
- uploadResult, uploadErr, data = doUpload(reader, uploadOption)
+ uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
@@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
return
}
-var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
-
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
- uploadResult, err = retriedUploadData(data, option)
+func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ uploadResult, err = uploader.retriedUploadData(data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
- uploadResult, err, data = doUpload(reader, option)
+func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = uploader.doUpload(reader, option)
return
}
-func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
@@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul
return
}
}
- uploadResult, uploadErr := retriedUploadData(data, option)
+ uploadResult, uploadErr := uploader.retriedUploadData(data, option)
return uploadResult, uploadErr, data
}
-func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
- uploadResult, err = doUploadData(data, option)
+ uploadResult, err = uploader.doUploadData(data, option)
if err == nil {
uploadResult.RetryCount = i
return
@@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR
return
}
-func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
@@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
}
// upload data
- uploadResult, err = upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
@@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
uploadResult.Size = uint32(clearDataLen)
} else {
// upload data
- uploadResult, err = upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
@@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
return uploadResult, err
}
-func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
+func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
@@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
// print("+")
- resp, post_err := HttpClient.Do(req)
- defer util.CloseResponse(resp)
+ resp, post_err := uploader.httpClient.Do(req)
+ defer util_http.CloseResponse(resp)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
- resp, post_err = HttpClient.Do(req)
- defer util.CloseResponse(resp)
+ resp, post_err = uploader.httpClient.Do(req)
+ defer util_http.CloseResponse(resp)
}
}
if post_err != nil {
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
index 9682ca623..4a77fd04a 100644
--- a/weed/replication/repl_util/replication_util.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -4,7 +4,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
- "github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
@@ -21,7 +21,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 63e1226b6..4bcbc7898 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) {
@@ -88,9 +89,15 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
if err != nil {
return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
- fileId, uploadResult, err, _ := operation.UploadWithRetry(
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
+ return "", fmt.Errorf("upload data: %v", err)
+ }
+
+ fileId, uploadResult, err, _ := uploader.UploadWithRetry(
fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 167907a5a..768e251a4 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type ReplicationSource interface {
@@ -106,7 +107,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Header, resp *http.Response, err error) {
if fs.proxyByFiler {
- return util.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
+ return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
}
fileUrls, err := fs.LookupFileId(fileId)
@@ -115,7 +116,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
}
for _, fileUrl := range fileUrls {
- filename, header, resp, err = util.DownloadFile(fileUrl, "")
+ filename, header, resp, err = util_http.DownloadFile(fileUrl, "")
if err != nil {
glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
} else {
diff --git a/weed/s3api/s3api_acl_helper.go b/weed/s3api/s3api_acl_helper.go
index 0332b6a39..b9fb1131e 100644
--- a/weed/s3api/s3api_acl_helper.go
+++ b/weed/s3api/s3api_acl_helper.go
@@ -9,9 +9,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"strings"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
type AccountManager interface {
@@ -32,7 +32,7 @@ func GetAccountId(r *http.Request) string {
// ExtractAcl extracts the acl from the request body, or from the header if request body is empty
func ExtractAcl(r *http.Request, accountManager AccountManager, ownership, bucketOwnerId, ownerId, accountId string) (grants []*s3.Grant, errCode s3err.ErrorCode) {
if r.Body != nil && r.Body != http.NoBody {
- defer util.CloseRequest(r)
+ defer util_http.CloseRequest(r)
var acp s3.AccessControlPolicy
err := xmlutil.UnmarshalXML(&acp, xml.NewDecoder(r.Body), "")
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index e3fa778a5..f451e60a2 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -13,7 +13,6 @@ import (
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
- "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
@@ -26,6 +25,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
@@ -507,7 +507,7 @@ func (s3a *S3ApiServer) PutBucketOwnershipControls(w http.ResponseWriter, r *htt
}
var v s3.OwnershipControls
- defer util.CloseRequest(r)
+ defer util_http.CloseRequest(r)
err := xmlutil.UnmarshalXML(&v, xml.NewDecoder(r.Body), "")
if err != nil {
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index ff7e92304..3ab72285f 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -16,7 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
@@ -171,7 +171,7 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
if resp.StatusCode == http.StatusPreconditionFailed {
s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go
index 8d13fe17e..4ca8010d2 100644
--- a/weed/s3api/s3api_object_handlers_copy.go
+++ b/weed/s3api/s3api_object_handlers_copy.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -87,12 +88,12 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
- _, _, resp, err := util.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
+ _, _, resp, err := util_http.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name)
if tagErr != nil {
@@ -175,12 +176,12 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
- resp, dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
+ resp, dataReader, err := util_http.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
defer dataReader.Close()
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 5e46c1459..e0517ffb7 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -20,6 +20,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
)
type S3ApiServerOption struct {
@@ -44,7 +46,7 @@ type S3ApiServer struct {
cb *CircuitBreaker
randomClientId int32
filerGuard *security.Guard
- client *http.Client
+ client util_http_client.HTTPClientInterface
bucketRegistry *BucketRegistry
}
@@ -84,10 +86,9 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
}
s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer)
if option.LocalFilerSocket == "" {
- s3ApiServer.client = &http.Client{Transport: &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
+ if s3ApiServer.client, err = util_http.NewGlobalHttpClient(); err != nil {
+ return nil, err
+ }
} else {
s3ApiServer.client = &http.Client{
Transport: &http.Transport{
diff --git a/weed/server/common.go b/weed/server/common.go
index 7be2f8a76..e6f6cdb88 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -181,7 +181,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
PairMap: pu.PairMap,
Jwt: assignResult.Auth,
}
- uploadResult, err := operation.UploadData(pu.Data, uploadOption)
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ uploadResult, err := uploader.UploadData(pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index e04994569..c1a26ca11 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,24 +3,13 @@ package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
- "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"math/rand"
"net/http"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
-var (
- client *http.Client
-)
-
-func init() {
- client = &http.Client{Transport: &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
-}
-
func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
@@ -71,14 +60,14 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
}
}
- proxyResponse, postErr := client.Do(proxyReq)
+ proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
w.WriteHeader(http.StatusInternalServerError)
return
}
- defer util.CloseResponse(proxyResponse)
+ defer util_http.CloseResponse(proxyResponse)
for k, v := range proxyResponse.Header {
w.Header()[k] = v
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index b186fd34e..ab3988f8c 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -120,7 +121,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
fs.autoChunk(ctx, w, r, contentLength, so)
}
- util.CloseRequest(r)
+ util_http.CloseRequest(r)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 029fbb7c9..1c7ed0c3c 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -308,8 +308,14 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
PairMap: nil,
Jwt: auth,
}
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return uploaderErr
+ }
+
var uploadErr error
- uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption)
+ uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
if uploadErr != nil {
return uploadErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 6cf7d65b1..f8d129bf3 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -53,7 +53,13 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
PairMap: pu.PairMap,
Jwt: auth,
}
- uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption)
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
+ }
+
+ uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 8c8eba078..d0d1575cf 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -158,7 +158,13 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
PairMap: pairMap,
Jwt: auth,
}
- uploadResult, err, data := operation.Upload(limitedReader, uploadOption)
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return nil, err, []byte{}
+ }
+
+ uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 014bdb7f8..65fa622e7 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -30,6 +30,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -256,7 +257,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
}
director(req)
}
- proxy.Transport = util.Transport
+ proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport()
proxy.ServeHTTP(w, r)
}
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 5e3e42dea..7479b5535 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@@ -113,11 +114,11 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
location := ms.findVolumeLocation(collection, vid)
if location.Error == "" {
loc := location.Locations[rand.Intn(len(location.Locations))]
- var url string
+ url, _ := util_http.NormalizeUrl(loc.PublicUrl)
if r.URL.RawQuery != "" {
- url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
+ url = url + r.URL.Path + "?" + r.URL.RawQuery
} else {
- url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
+ url = url + r.URL.Path
}
http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 64254b3b8..4452e019b 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -70,10 +70,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
PairMap: nil,
Jwt: security.EncodedJwt(req.Auth),
}
- if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
- if err == nil {
- err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
- }
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil && err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
+ return
+ }
+
+ if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
}
}(replica.Url)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index ccbd42054..15d639f49 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -27,6 +27,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
@@ -81,7 +82,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
if vs.ReadMode == "proxy" {
// proxy client request to target server
- u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url))
+ rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url)
+ u, _ := url.Parse(rawURL)
r.URL.Host = u.Host
r.URL.Scheme = u.Scheme
request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
@@ -96,13 +98,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- response, err := client.Do(request)
+ response, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
- defer util.CloseResponse(response)
+ defer util_http.CloseResponse(response)
// proxy target response to client
for k, vv := range response.Header {
for _, v := range vv {
@@ -116,7 +118,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
} else {
// redirect
- u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
+ rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl)
+ u, _ := url.Parse(rawURL)
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 97d51dad7..f8d964552 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -392,8 +392,13 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
}
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr)
+ return nil, fmt.Errorf("upload data: %v", uploaderErr)
+ }
- fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
f.fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go
index f6d55c616..b77feb8e3 100644
--- a/weed/shell/command_fs_merge_volumes.go
+++ b/weed/shell/command_fs_merge_volumes.go
@@ -19,14 +19,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
-)
-
-var (
- client *http.Client
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func init() {
- client = &http.Client{}
Commands = append(Commands, &commandFsMergeVolumes{})
}
@@ -104,7 +100,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
- defer client.CloseIdleConnections()
+ defer util_http.GetGlobalHttpClient().CloseIdleConnections()
return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
@@ -304,7 +300,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
if err != nil {
return err
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
defer reader.Close()
var filename string
@@ -322,7 +318,12 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
md5 := resp.Header.Get("Content-MD5")
- _, err, _ = operation.Upload(reader, &operation.UploadOption{
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return err
+ }
+
+ _, err, _ = uploader.Upload(reader, &operation.UploadOption{
UploadUrl: uploadURL,
Filename: filename,
IsInputCompressed: isCompressed,
@@ -348,12 +349,12 @@ func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
}
req.Header.Add("Accept-Encoding", "gzip")
- r, err := client.Do(req)
+ r, err := util_http.GetGlobalHttpClient().Do(req)
if err != nil {
return nil, nil, err
}
if r.StatusCode >= 400 {
- util.CloseResponse(r)
+ util_http.CloseResponse(r)
return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
index 2be61f72a..accce60ba 100644
--- a/weed/shell/command_s3_clean_uploads.go
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func init() {
@@ -90,7 +91,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io
deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
- err = util.Delete(deleteUrl, string(encodedJwt))
+ err = util_http.Delete(deleteUrl, string(encodedJwt))
if err != nil && err.Error() != "" {
return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 1d27fae1d..dd58175cf 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -31,6 +31,7 @@ import (
"strings"
"sync"
"time"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func init() {
@@ -552,9 +553,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
fmt.Fprintf(c.writer, "HTTP delete request error: %v\n", err)
}
- client := &http.Client{}
-
- resp, err := client.Do(req)
+ resp, err := util_http.GetGlobalHttpClient().Do(req)
if err != nil {
fmt.Fprintf(c.writer, "DELETE fetch error: %v\n", err)
}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index 82c2db79c..b4a7d649c 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -20,6 +20,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
@@ -105,7 +106,12 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
BytesBuffer: bytesBuffer,
}
- _, err := operation.UploadData(n.Data, uploadOption)
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
+ return err
+ }
+ _, err = uploader.UploadData(n.Data, uploadOption)
if err != nil {
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
}
@@ -144,7 +150,7 @@ func ReplicatedDelete(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOp
if len(remoteLocations) > 0 { //send to other replica locations
if err = DistributedOperation(remoteLocations, func(location operation.Location) error {
- return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
+ return util_http.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
}); err != nil {
size = 0
}
diff --git a/weed/util/http/client/http_client.go b/weed/util/http/client/http_client.go
new file mode 100644
index 000000000..d1d2f5c56
--- /dev/null
+++ b/weed/util/http/client/http_client.go
@@ -0,0 +1,201 @@
+package client
+
+import (
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ util "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/spf13/viper"
+ "io"
+ "net/http"
+ "net/url"
+ "os"
+ "strings"
+ "sync"
+)
+
+var (
+ loadSecurityConfigOnce sync.Once
+)
+
+type HTTPClient struct {
+ Client *http.Client
+ Transport *http.Transport
+ expectHttpsScheme bool
+}
+
+func (httpClient *HTTPClient) Do(req *http.Request) (*http.Response, error) {
+ req.URL.Scheme = httpClient.GetHttpScheme()
+ return httpClient.Client.Do(req)
+}
+
+func (httpClient *HTTPClient) Get(url string) (resp *http.Response, err error) {
+ url, err = httpClient.NormalizeHttpScheme(url)
+ if err != nil {
+ return nil, err
+ }
+ return httpClient.Client.Get(url)
+}
+
+func (httpClient *HTTPClient) Post(url, contentType string, body io.Reader) (resp *http.Response, err error) {
+ url, err = httpClient.NormalizeHttpScheme(url)
+ if err != nil {
+ return nil, err
+ }
+ return httpClient.Client.Post(url, contentType, body)
+}
+
+func (httpClient *HTTPClient) PostForm(url string, data url.Values) (resp *http.Response, err error) {
+ url, err = httpClient.NormalizeHttpScheme(url)
+ if err != nil {
+ return nil, err
+ }
+ return httpClient.Client.PostForm(url, data)
+}
+
+func (httpClient *HTTPClient) Head(url string) (resp *http.Response, err error) {
+ url, err = httpClient.NormalizeHttpScheme(url)
+ if err != nil {
+ return nil, err
+ }
+ return httpClient.Client.Head(url)
+}
+func (httpClient *HTTPClient) CloseIdleConnections() {
+ httpClient.Client.CloseIdleConnections()
+}
+
+func (httpClient *HTTPClient) GetClientTransport() *http.Transport {
+ return httpClient.Transport
+}
+
+func (httpClient *HTTPClient) GetHttpScheme() string {
+ if httpClient.expectHttpsScheme {
+ return "https"
+ }
+ return "http"
+}
+
+func (httpClient *HTTPClient) NormalizeHttpScheme(rawURL string) (string, error) {
+ expectedScheme := httpClient.GetHttpScheme()
+
+ if !(strings.HasPrefix(rawURL, "http://") || strings.HasPrefix(rawURL, "https://")) {
+ return expectedScheme + "://" + rawURL, nil
+ }
+
+ parsedURL, err := url.Parse(rawURL)
+ if err != nil {
+ return "", err
+ }
+
+ if expectedScheme != parsedURL.Scheme {
+ parsedURL.Scheme = expectedScheme
+ }
+ return parsedURL.String(), nil
+}
+
+func NewHttpClient(clientName ClientName, opts ...HttpClientOpt) (*HTTPClient, error) {
+ httpClient := HTTPClient{}
+ httpClient.expectHttpsScheme = checkIsHttpsClientEnabled(clientName)
+ var tlsConfig *tls.Config = nil
+
+ if httpClient.expectHttpsScheme {
+ clientCertPair, err := getClientCertPair(clientName)
+ if err != nil {
+ return nil, err
+ }
+
+ clientCaCert, clientCaCertName, err := getClientCaCert(clientName)
+ if err != nil {
+ return nil, err
+ }
+
+ if clientCertPair != nil || len(clientCaCert) != 0 {
+ caCertPool, err := createHTTPClientCertPool(clientCaCert, clientCaCertName)
+ if err != nil {
+ return nil, err
+ }
+
+ tlsConfig = &tls.Config{
+ Certificates: []tls.Certificate{},
+ RootCAs: caCertPool,
+ InsecureSkipVerify: false,
+ }
+
+ if clientCertPair != nil {
+ tlsConfig.Certificates = append(tlsConfig.Certificates, *clientCertPair)
+ }
+ }
+ }
+
+ httpClient.Transport = &http.Transport{
+ MaxIdleConns: 1024,
+ MaxIdleConnsPerHost: 1024,
+ TLSClientConfig: tlsConfig,
+ }
+ httpClient.Client = &http.Client{
+ Transport: httpClient.Transport,
+ }
+
+ for _, opt := range opts {
+ opt(&httpClient)
+ }
+ return &httpClient, nil
+}
+
+func getStringOptionFromSecurityConfiguration(clientName ClientName, stringOptionName string) string {
+ util.LoadSecurityConfiguration()
+ return viper.GetString(fmt.Sprintf("https.%s.%s", clientName.LowerCaseString(), stringOptionName))
+}
+
+func getBoolOptionFromSecurityConfiguration(clientName ClientName, boolOptionName string) bool {
+ util.LoadSecurityConfiguration()
+ return viper.GetBool(fmt.Sprintf("https.%s.%s", clientName.LowerCaseString(), boolOptionName))
+}
+
+func checkIsHttpsClientEnabled(clientName ClientName) bool {
+ return getBoolOptionFromSecurityConfiguration(clientName, "enabled")
+}
+
+func getFileContentFromSecurityConfiguration(clientName ClientName, fileType string) ([]byte, string, error) {
+ if fileName := getStringOptionFromSecurityConfiguration(clientName, fileType); fileName != "" {
+ fileContent, err := os.ReadFile(fileName)
+ if err != nil {
+ return nil, fileName, err
+ }
+ return fileContent, fileName, err
+ }
+ return nil, "", nil
+}
+
+func getClientCertPair(clientName ClientName) (*tls.Certificate, error) {
+ certFileName := getStringOptionFromSecurityConfiguration(clientName, "cert")
+ keyFileName := getStringOptionFromSecurityConfiguration(clientName, "key")
+ if certFileName == "" && keyFileName == "" {
+ return nil, nil
+ }
+ if certFileName != "" && keyFileName != "" {
+ clientCert, err := tls.LoadX509KeyPair(certFileName, keyFileName)
+ if err != nil {
+ return nil, fmt.Errorf("error loading client certificate and key: %s", err)
+ }
+ return &clientCert, nil
+ }
+ return nil, fmt.Errorf("error loading key pair: key `%s` and certificate `%s`", keyFileName, certFileName)
+}
+
+func getClientCaCert(clientName ClientName) ([]byte, string, error) {
+ return getFileContentFromSecurityConfiguration(clientName, "ca")
+}
+
+func createHTTPClientCertPool(certContent []byte, fileName string) (*x509.CertPool, error) {
+ certPool := x509.NewCertPool()
+ if len(certContent) == 0 {
+ return certPool, nil
+ }
+
+ ok := certPool.AppendCertsFromPEM(certContent)
+ if !ok {
+ return nil, fmt.Errorf("error processing certificate in %s", fileName)
+ }
+ return certPool, nil
+}
diff --git a/weed/util/http/client/http_client_interface.go b/weed/util/http/client/http_client_interface.go
new file mode 100644
index 000000000..7a2d43360
--- /dev/null
+++ b/weed/util/http/client/http_client_interface.go
@@ -0,0 +1,16 @@
+package client
+
+import (
+ "io"
+ "net/http"
+ "net/url"
+)
+
+type HTTPClientInterface interface {
+ Do(req *http.Request) (*http.Response, error)
+ Get(url string) (resp *http.Response, err error)
+ Post(url, contentType string, body io.Reader) (resp *http.Response, err error)
+ PostForm(url string, data url.Values) (resp *http.Response, err error)
+ Head(url string) (resp *http.Response, err error)
+ CloseIdleConnections()
+}
diff --git a/weed/util/http/client/http_client_name.go b/weed/util/http/client/http_client_name.go
new file mode 100644
index 000000000..aedaebbc6
--- /dev/null
+++ b/weed/util/http/client/http_client_name.go
@@ -0,0 +1,14 @@
+package client
+
+import "strings"
+
+type ClientName int
+
+//go:generate stringer -type=ClientName -output=http_client_name_string.go
+const (
+ Client ClientName = iota
+)
+
+func (name *ClientName) LowerCaseString() string {
+ return strings.ToLower(name.String())
+}
diff --git a/weed/util/http/client/http_client_name_string.go b/weed/util/http/client/http_client_name_string.go
new file mode 100644
index 000000000..652fcdaac
--- /dev/null
+++ b/weed/util/http/client/http_client_name_string.go
@@ -0,0 +1,23 @@
+// Code generated by "stringer -type=ClientName -output=http_client_name_string.go"; DO NOT EDIT.
+
+package client
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[Client-0]
+}
+
+const _ClientName_name = "Client"
+
+var _ClientName_index = [...]uint8{0, 6}
+
+func (i ClientName) String() string {
+ if i < 0 || i >= ClientName(len(_ClientName_index)-1) {
+ return "ClientName(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _ClientName_name[_ClientName_index[i]:_ClientName_index[i+1]]
+}
diff --git a/weed/util/http/client/http_client_opt.go b/weed/util/http/client/http_client_opt.go
new file mode 100644
index 000000000..1ff9d533d
--- /dev/null
+++ b/weed/util/http/client/http_client_opt.go
@@ -0,0 +1,18 @@
+package client
+
+import (
+ "net"
+ "time"
+)
+
+type HttpClientOpt = func(clientCfg *HTTPClient)
+
+func AddDialContext(httpClient *HTTPClient) {
+ dialContext := (&net.Dialer{
+ Timeout: 10 * time.Second,
+ KeepAlive: 10 * time.Second,
+ }).DialContext
+
+ httpClient.Transport.DialContext = dialContext
+ httpClient.Client.Transport = httpClient.Transport
+}
diff --git a/weed/util/http/http_global_client_init.go b/weed/util/http/http_global_client_init.go
new file mode 100644
index 000000000..0dcb05cfd
--- /dev/null
+++ b/weed/util/http/http_global_client_init.go
@@ -0,0 +1,27 @@
+package http
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
+)
+
+var (
+ globalHttpClient *util_http_client.HTTPClient
+)
+
+func NewGlobalHttpClient(opt ...util_http_client.HttpClientOpt) (*util_http_client.HTTPClient, error) {
+ return util_http_client.NewHttpClient(util_http_client.Client, opt...)
+}
+
+func GetGlobalHttpClient() *util_http_client.HTTPClient {
+ return globalHttpClient
+}
+
+func InitGlobalHttpClient() {
+ var err error
+
+ globalHttpClient, err = NewGlobalHttpClient()
+ if err != nil {
+ glog.Fatalf("error init global http client: %v", err)
+ }
+}
diff --git a/weed/util/http_util.go b/weed/util/http/http_global_client_util.go
index 837b3ccb6..c3931a790 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -1,4 +1,4 @@
-package util
+package http
import (
"compress/gzip"
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
"net/http"
"net/url"
@@ -15,23 +16,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
)
-var (
- client *http.Client
- Transport *http.Transport
-)
-
-func init() {
- Transport = &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }
- client = &http.Client{
- Transport: Transport,
- }
-}
-
func Post(url string, values url.Values) ([]byte, error) {
- r, err := client.PostForm(url, values)
+ r, err := GetGlobalHttpClient().PostForm(url, values)
if err != nil {
return nil, err
}
@@ -64,7 +50,7 @@ func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
maybeAddAuth(request, jwt)
request.Header.Add("Accept-Encoding", "gzip")
- response, err := client.Do(request)
+ response, err := GetGlobalHttpClient().Do(request)
if err != nil {
return nil, true, err
}
@@ -94,7 +80,7 @@ func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
}
func Head(url string) (http.Header, error) {
- r, err := client.Head(url)
+ r, err := GetGlobalHttpClient().Head(url)
if err != nil {
return nil, err
}
@@ -117,7 +103,7 @@ func Delete(url string, jwt string) error {
if err != nil {
return err
}
- resp, e := client.Do(req)
+ resp, e := GetGlobalHttpClient().Do(req)
if e != nil {
return e
}
@@ -145,7 +131,7 @@ func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err err
if err != nil {
return
}
- resp, err := client.Do(req)
+ resp, err := GetGlobalHttpClient().Do(req)
if err != nil {
return
}
@@ -159,7 +145,7 @@ func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err err
}
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
- r, err := client.PostForm(url, values)
+ r, err := GetGlobalHttpClient().PostForm(url, values)
if err != nil {
return err
}
@@ -182,7 +168,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB
}
func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
- r, err := client.PostForm(url, values)
+ r, err := GetGlobalHttpClient().PostForm(url, values)
if err != nil {
return err
}
@@ -201,7 +187,7 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head
maybeAddAuth(req, jwt)
- response, err := client.Do(req)
+ response, err := GetGlobalHttpClient().Do(req)
if err != nil {
return "", nil, nil, err
}
@@ -219,14 +205,11 @@ func DownloadFile(fileUrl string, jwt string) (filename string, header http.Head
}
func Do(req *http.Request) (resp *http.Response, err error) {
- return client.Do(req)
+ return GetGlobalHttpClient().Do(req)
}
-func NormalizeUrl(url string) string {
- if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
- return url
- }
- return "http://" + url
+func NormalizeUrl(url string) (string, error) {
+ return GetGlobalHttpClient().NormalizeHttpScheme(url)
}
func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
@@ -249,7 +232,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
req.Header.Set("Accept-Encoding", "gzip")
}
- r, err := client.Do(req)
+ r, err := GetGlobalHttpClient().Do(req)
if err != nil {
return 0, err
}
@@ -322,7 +305,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
- r, err := client.Do(req)
+ r, err := GetGlobalHttpClient().Do(req)
if err != nil {
return true, err
}
@@ -368,12 +351,12 @@ func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
- decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
+ decryptedData, err := util.Decrypt(encryptedData, util.CipherKey(cipherKey))
if err != nil {
return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
}
if isContentCompressed {
- decryptedData, err = DecompressData(decryptedData)
+ decryptedData, err = util.DecompressData(decryptedData)
if err != nil {
glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
}
@@ -403,7 +386,7 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt
maybeAddAuth(req, jwt)
- r, err := client.Do(req)
+ r, err := GetGlobalHttpClient().Do(req)
if err != nil {
return nil, nil, err
}
@@ -463,7 +446,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
var shouldRetry bool
- for waitTime := time.Second; waitTime < RetryWaitTime; waitTime += waitTime / 2 {
+ for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
n = 0
if strings.Contains(urlString, "%") {
@@ -494,4 +477,4 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
return n, err
-}
+} \ No newline at end of file
diff --git a/weed/weed.go b/weed/weed.go
index a821cd72f..5139dd39c 100644
--- a/weed/weed.go
+++ b/weed/weed.go
@@ -20,6 +20,7 @@ import (
"github.com/getsentry/sentry-go"
"github.com/seaweedfs/seaweedfs/weed/command"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var IsDebug *bool
@@ -86,6 +87,7 @@ func main() {
return
}
+ util_http.InitGlobalHttpClient()
for _, cmd := range commands {
if cmd.Name() == args[0] && cmd.Run != nil {
cmd.Flag.Usage = func() { cmd.Usage() }