aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go31
-rw-r--r--weed/operation/chunked_file.go10
-rw-r--r--weed/operation/delete_content.go20
-rw-r--r--weed/operation/grpc_client.go52
-rw-r--r--weed/operation/lookup.go10
-rw-r--r--weed/operation/needle_parse_test.go22
-rw-r--r--weed/operation/submit.go36
-rw-r--r--weed/operation/sync_volume.go5
-rw-r--r--weed/operation/tail_volume.go7
-rw-r--r--weed/operation/upload_content.go143
10 files changed, 199 insertions, 137 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index f441dcb50..b716300e2 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
@@ -26,9 +27,11 @@ type AssignResult struct {
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
+ GrpcPort int `json:"grpcPort,omitempty"`
Count uint64 `json:"count,omitempty"`
Error string `json:"error,omitempty"`
Auth security.EncodedJwt `json:"auth,omitempty"`
+ Replicas []Location `json:"replicas,omitempty"`
}
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
@@ -45,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
@@ -63,15 +66,22 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
return grpcErr
}
+ if resp.Error != "" {
+ return fmt.Errorf("assignRequest: %v", resp.Error)
+ }
+
ret.Count = resp.Count
ret.Fid = resp.Fid
- ret.Url = resp.Url
- ret.PublicUrl = resp.PublicUrl
+ ret.Url = resp.Location.Url
+ ret.PublicUrl = resp.Location.PublicUrl
+ ret.GrpcPort = int(resp.Location.GrpcPort)
ret.Error = resp.Error
ret.Auth = security.EncodedJwt(resp.Auth)
-
- if resp.Error != "" {
- return fmt.Errorf("assignRequest: %v", resp.Error)
+ for _, r := range resp.Replicas {
+ ret.Replicas = append(ret.Replicas, Location{
+ Url: r.Url,
+ PublicUrl: r.PublicUrl,
+ })
}
return nil
@@ -93,9 +103,9 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
return ret, lastError
}
-func LookupJwt(master string, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
+func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
- WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
@@ -123,6 +133,7 @@ type StorageOption struct {
Collection string
DataCenter string
Rack string
+ DataNode string
TtlSeconds int32
Fsync bool
VolumeGrowthCount uint32
@@ -141,9 +152,10 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
DiskType: so.DiskType,
DataCenter: so.DataCenter,
Rack: so.Rack,
+ DataNode: so.DataNode,
WritableVolumeCount: so.VolumeGrowthCount,
}
- if so.DataCenter != "" || so.Rack != "" {
+ if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" {
altRequest = &VolumeAssignRequest{
Count: uint64(count),
Replication: so.Replication,
@@ -152,6 +164,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
DiskType: so.DiskType,
DataCenter: "",
Rack: "",
+ DataNode: "",
WritableVolumeCount: so.VolumeGrowthCount,
}
}
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 94939f1f3..45068bbcc 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "io/ioutil"
"net/http"
"sort"
"sync"
@@ -13,6 +12,7 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -42,7 +42,7 @@ type ChunkManifest struct {
type ChunkedFileReader struct {
totalSize int64
chunkList []*ChunkInfo
- master string
+ master pb.ServerAddress
pos int64
pr *io.PipeReader
pw *io.PipeWriter
@@ -107,7 +107,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
return written, err
}
defer func() {
- io.Copy(ioutil.Discard, resp.Body)
+ io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
@@ -127,7 +127,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
return io.Copy(w, resp.Body)
}
-func NewChunkedFileReader(chunkList []*ChunkInfo, master string, grpcDialOption grpc.DialOption) *ChunkedFileReader {
+func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader {
var totalSize int64
for _, chunk := range chunkList {
totalSize += chunk.Size
@@ -176,7 +176,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, jwt, lookupError := LookupFileId(func() string {
+ fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
return cf.master
}, cf.grpcDialOption, ci.Fid)
if lookupError != nil {
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 15d07a52e..587cf1d01 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc"
"net/http"
"strings"
@@ -74,22 +75,23 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
return ret, err
}
- server_to_fileIds := make(map[string][]string)
+ server_to_fileIds := make(map[pb.ServerAddress][]string)
for vid, result := range lookupResults {
if result.Error != "" {
ret = append(ret, &volume_server_pb.DeleteResult{
FileId: vid,
Status: http.StatusBadRequest,
- Error: err.Error()},
+ Error: result.Error},
)
continue
}
for _, location := range result.Locations {
- if _, ok := server_to_fileIds[location.Url]; !ok {
- server_to_fileIds[location.Url] = make([]string, 0)
+ serverAddress := location.ServerAddress()
+ if _, ok := server_to_fileIds[serverAddress]; !ok {
+ server_to_fileIds[serverAddress] = make([]string, 0)
}
- server_to_fileIds[location.Url] = append(
- server_to_fileIds[location.Url], vid_to_fileIds[vid]...)
+ server_to_fileIds[serverAddress] = append(
+ server_to_fileIds[serverAddress], vid_to_fileIds[vid]...)
}
}
@@ -97,7 +99,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
var wg sync.WaitGroup
for server, fidList := range server_to_fileIds {
wg.Add(1)
- go func(server string, fidList []string) {
+ go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
@@ -119,9 +121,9 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 025a65b38..9b68d2286 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -1,67 +1,27 @@
package operation
import (
- "fmt"
- "strconv"
- "strings"
-
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
-func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
-
- grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
- if err != nil {
- return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err)
- }
+func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, grpcAddress, grpcDialOption)
-
-}
+ }, volumeServer.ToGrpcAddress(), grpcDialOption)
-func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) {
- sepIndex := strings.LastIndex(volumeServer, ":")
- port, err := strconv.Atoi(volumeServer[sepIndex+1:])
- if err != nil {
- glog.Errorf("failed to parse volume server address: %v", volumeServer)
- return "", err
- }
- return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
-
- masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer)
- if parseErr != nil {
- return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr)
- }
+func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress, grpcDialOption)
-
-}
-
-func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error {
-
- filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer)
- if parseErr != nil {
- return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
- }
-
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, filerGrpcAddress, grpcDialOption)
+ }, masterServer.ToGrpcAddress(), grpcDialOption)
}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 8717f6b36..1eb5dd320 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc"
"math/rand"
"strings"
@@ -15,7 +16,13 @@ import (
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
+ GrpcPort int `json:"grpcPort,omitempty"`
}
+
+func (l *Location) ServerAddress() pb.ServerAddress {
+ return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort)
+}
+
type LookupResult struct {
VolumeOrFileId string `json:"volumeOrFileId,omitempty"`
Locations []Location `json:"locations,omitempty"`
@@ -72,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
//only query unknown_vids
- err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeOrFileIds: unknown_vids,
@@ -89,6 +96,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
locations = append(locations, Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
+ GrpcPort: int(loc.GrpcPort),
})
}
if vidLocations.Error != "" {
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
index d7e8a4162..2b44b3b26 100644
--- a/weed/operation/needle_parse_test.go
+++ b/weed/operation/needle_parse_test.go
@@ -53,7 +53,16 @@ func TestCreateNeedleFromRequest(t *testing.T) {
assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize)
}
- uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "")
+ uploadOption := &UploadOption{
+ UploadUrl: "http://localhost:8080/389,0f084d17353afda0",
+ Filename: "t.txt",
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: "",
+ }
+ uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent))
}
@@ -72,7 +81,16 @@ func TestCreateNeedleFromRequest(t *testing.T) {
fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
}
gzippedData, _ := util.GzipData([]byte(textContent))
- Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "")
+ uploadOption := &UploadOption{
+ UploadUrl: "http://localhost:8080/389,0f084d17353afda0",
+ Filename: "t.txt",
+ Cipher: false,
+ IsInputCompressed: true,
+ MimeType: "text/plain",
+ PairMap: nil,
+ Jwt: "",
+ }
+ Upload(bytes.NewReader(gzippedData), uploadOption)
}
/*
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 87c5e4279..648df174a 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,6 +1,7 @@
package operation
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"mime"
"net/url"
@@ -39,7 +40,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-type GetMasterFn func() string
+type GetMasterFn func() pb.ServerAddress
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
@@ -206,7 +207,16 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
}
} else {
- ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
+ uploadOption := &UploadOption{
+ UploadUrl: fileUrl,
+ Filename: baseName,
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: fi.MimeType,
+ PairMap: nil,
+ Jwt: jwt,
+ }
+ ret, e, _ := Upload(fi.Reader, uploadOption)
if e != nil {
return 0, e
}
@@ -219,7 +229,16 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
+ uploadOption := &UploadOption{
+ UploadUrl: fileUrl,
+ Filename: filename,
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: jwt,
+ }
+ uploadResult, uploadError, _ := Upload(reader, uploadOption)
if uploadError != nil {
return 0, uploadError
}
@@ -236,6 +255,15 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
+ uploadOption := &UploadOption{
+ UploadUrl: u.String(),
+ Filename: manifest.Name,
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "application/json",
+ PairMap: nil,
+ Jwt: jwt,
+ }
+ _, e = UploadData(buf, uploadOption)
return e
}
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index 5562f12ab..de71a198d 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -2,13 +2,14 @@ package operation
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
)
-func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
+func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
VolumeId: vid,
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index e3f2c0664..d3449873b 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"google.golang.org/grpc"
@@ -21,13 +22,13 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle
return fmt.Errorf("unable to locate volume %d", vid)
}
- volumeServer := lookup.Locations[0].Url
+ volumeServer := lookup.Locations[0].ServerAddress()
return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
}
-func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
- return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
+ return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 8e7c6f733..3d41d2eb5 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -4,22 +4,31 @@ import (
"bytes"
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
"io"
- "io/ioutil"
"mime"
"mime/multipart"
+ "net"
"net/http"
"net/textproto"
"path/filepath"
"strings"
"time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
)
+type UploadOption struct {
+ UploadUrl string
+ Filename string
+ Cipher bool
+ IsInputCompressed bool
+ MimeType string
+ PairMap map[string]string
+ Jwt security.EncodedJwt
+}
+
type UploadResult struct {
Name string `json:"name,omitempty"`
Size uint32 `json:"size,omitempty"`
@@ -57,68 +66,72 @@ var (
func init() {
HttpClient = &http.Client{Transport: &http.Transport{
+ DialContext: (&net.Dialer{
+ Timeout: 10 * time.Second,
+ KeepAlive: 10 * time.Second,
+ }).DialContext,
MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
-var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
+var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ uploadResult, err = retriedUploadData(data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
- uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
+func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = doUpload(reader, option)
return
}
-func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
} else {
- data, err = ioutil.ReadAll(reader)
+ data, err = io.ReadAll(reader)
if err != nil {
err = fmt.Errorf("read input: %v", err)
return
}
}
- uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ uploadResult, uploadErr := retriedUploadData(data, option)
return uploadResult, uploadErr, data
}
-func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
- uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ uploadResult, err = doUploadData(data, option)
if err == nil {
uploadResult.RetryCount = i
return
} else {
- glog.Warningf("uploading to %s: %v", uploadUrl, err)
+ glog.Warningf("uploading to %s: %v", option.UploadUrl, err)
}
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
return
}
-func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- contentIsGzipped := isInputCompressed
+func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
- if !isInputCompressed {
- if mtype == "" {
- mtype = http.DetectContentType(data)
- // println("detect1 mimetype to", mtype)
- if mtype == "application/octet-stream" {
- mtype = ""
+ if !option.IsInputCompressed {
+ if option.MimeType == "" {
+ option.MimeType = http.DetectContentType(data)
+ // println("detect1 mimetype to", MimeType)
+ if option.MimeType == "application/octet-stream" {
+ option.MimeType = ""
}
}
- if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed {
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(option.Filename), option.MimeType); iAmSure && shouldBeCompressed {
shouldGzipNow = true
- } else if !iAmSure && mtype == "" && len(data) > 16*1024 {
+ } else if !iAmSure && option.MimeType == "" && len(data) > 16*1024 {
var compressed []byte
compressed, err = util.GzipData(data[0:128])
shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
@@ -131,14 +144,14 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
// this could be double copying
clearDataLen = len(data)
clearData := data
- if shouldGzipNow && !cipher {
+ if shouldGzipNow && !option.Cipher {
compressed, compressErr := util.GzipData(data)
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
if compressErr == nil {
data = compressed
contentIsGzipped = true
}
- } else if isInputCompressed {
+ } else if option.IsInputCompressed {
// just to get the clear data length
clearData, err = util.DecompressData(data)
if err == nil {
@@ -146,7 +159,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
}
}
- if cipher {
+ if option.Cipher {
// encrypt(gzip(data))
// encrypt
@@ -158,23 +171,39 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
}
// upload data
- uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ uploadResult, err = upload_content(func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
- }, "", false, len(encryptedData), "", nil, jwt)
+ }, len(encryptedData), &UploadOption{
+ UploadUrl: option.UploadUrl,
+ Filename: "",
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: option.Jwt,
+ })
if uploadResult == nil {
return
}
- uploadResult.Name = filename
- uploadResult.Mime = mtype
+ uploadResult.Name = option.Filename
+ uploadResult.Mime = option.MimeType
uploadResult.CipherKey = cipherKey
uploadResult.Size = uint32(clearDataLen)
} else {
// upload data
- uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ uploadResult, err = upload_content(func(w io.Writer) (err error) {
_, err = w.Write(data)
return
- }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt)
+ }, len(data), &UploadOption{
+ UploadUrl: option.UploadUrl,
+ Filename: option.Filename,
+ Cipher: false,
+ IsInputCompressed: contentIsGzipped,
+ MimeType: option.MimeType,
+ PairMap: option.PairMap,
+ Jwt: option.Jwt,
+ })
if uploadResult == nil {
return
}
@@ -187,20 +216,21 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
return uploadResult, err
}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
+func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
buf := GetBuffer()
defer PutBuffer(buf)
body_writer := multipart.NewWriter(buf)
h := make(textproto.MIMEHeader)
- h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
- h.Set("Idempotency-Key", uploadUrl)
- if mtype == "" {
- mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
+ filename := fileNameEscaper.Replace(option.Filename)
+ h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename))
+ h.Set("Idempotency-Key", option.UploadUrl)
+ if option.MimeType == "" {
+ option.MimeType = mime.TypeByExtension(strings.ToLower(filepath.Ext(option.Filename)))
}
- if mtype != "" {
- h.Set("Content-Type", mtype)
+ if option.MimeType != "" {
+ h.Set("Content-Type", option.MimeType)
}
- if isGzipped {
+ if option.IsInputCompressed {
h.Set("Content-Encoding", "gzip")
}
@@ -219,28 +249,29 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return nil, err
}
- req, postErr := http.NewRequest("POST", uploadUrl, bytes.NewReader(buf.Bytes()))
+ req, postErr := http.NewRequest("POST", option.UploadUrl, bytes.NewReader(buf.Bytes()))
if postErr != nil {
- glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr)
- return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr)
+ glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr)
+ return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
}
req.Header.Set("Content-Type", content_type)
- for k, v := range pairMap {
+ for k, v := range option.PairMap {
req.Header.Set(k, v)
}
- if jwt != "" {
- req.Header.Set("Authorization", "BEARER "+string(jwt))
+ if option.Jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
// print("+")
resp, post_err := HttpClient.Do(req)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
+ glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
resp, post_err = HttpClient.Do(req)
}
}
if post_err != nil {
- return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
+ return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err)
}
// print("-")
defer util.CloseResponse(resp)
@@ -252,18 +283,18 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return &ret, nil
}
- resp_body, ra_err := ioutil.ReadAll(resp.Body)
+ resp_body, ra_err := io.ReadAll(resp.Body)
if ra_err != nil {
- return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err)
+ return nil, fmt.Errorf("read response body %v: %v", option.UploadUrl, ra_err)
}
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body))
- return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err)
+ glog.Errorf("unmarshal %s: %v", option.UploadUrl, string(resp_body))
+ return nil, fmt.Errorf("unmarshal %v: %v", option.UploadUrl, unmarshal_err)
}
if ret.Error != "" {
- return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error)
+ return nil, fmt.Errorf("unmarshalled error %v: %v", option.UploadUrl, ret.Error)
}
ret.ETag = etag
ret.ContentMd5 = resp.Header.Get("Content-MD5")