diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 31 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 10 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 20 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 52 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 10 | ||||
| -rw-r--r-- | weed/operation/needle_parse_test.go | 22 | ||||
| -rw-r--r-- | weed/operation/submit.go | 36 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 5 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 7 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 143 |
10 files changed, 199 insertions, 137 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index f441dcb50..b716300e2 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,6 +3,7 @@ package operation import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" @@ -26,9 +27,11 @@ type AssignResult struct { Fid string `json:"fid,omitempty"` Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` + GrpcPort int `json:"grpcPort,omitempty"` Count uint64 `json:"count,omitempty"` Error string `json:"error,omitempty"` Auth security.EncodedJwt `json:"auth,omitempty"` + Replicas []Location `json:"replicas,omitempty"` } func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { @@ -45,7 +48,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, @@ -63,15 +66,22 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest return grpcErr } + if resp.Error != "" { + return fmt.Errorf("assignRequest: %v", resp.Error) + } + ret.Count = resp.Count ret.Fid = resp.Fid - ret.Url = resp.Url - ret.PublicUrl = resp.PublicUrl + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) ret.Error = resp.Error ret.Auth = security.EncodedJwt(resp.Auth) - - if resp.Error != "" { - return fmt.Errorf("assignRequest: %v", resp.Error) + for _, r := range resp.Replicas { + ret.Replicas = append(ret.Replicas, Location{ + Url: r.Url, + PublicUrl: r.PublicUrl, + }) } return nil @@ -93,9 +103,9 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest return ret, lastError } -func LookupJwt(master string, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { +func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { - WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + WithMasterServerClient(false, master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, grpcErr := masterClient.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, @@ -123,6 +133,7 @@ type StorageOption struct { Collection string DataCenter string Rack string + DataNode string TtlSeconds int32 Fsync bool VolumeGrowthCount uint32 @@ -141,9 +152,10 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a DiskType: so.DiskType, DataCenter: so.DataCenter, Rack: so.Rack, + DataNode: so.DataNode, WritableVolumeCount: so.VolumeGrowthCount, } - if so.DataCenter != "" || so.Rack != "" { + if so.DataCenter != "" || so.Rack != "" || so.DataNode != "" { altRequest = &VolumeAssignRequest{ Count: uint64(count), Replication: so.Replication, @@ -152,6 +164,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a DiskType: so.DiskType, DataCenter: "", Rack: "", + DataNode: "", WritableVolumeCount: so.VolumeGrowthCount, } } diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 94939f1f3..45068bbcc 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "sort" "sync" @@ -13,6 +12,7 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -42,7 +42,7 @@ type ChunkManifest struct { type ChunkedFileReader struct { totalSize int64 chunkList []*ChunkInfo - master string + master pb.ServerAddress pos int64 pr *io.PipeReader pw *io.PipeWriter @@ -107,7 +107,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri return written, err } defer func() { - io.Copy(ioutil.Discard, resp.Body) + io.Copy(io.Discard, resp.Body) resp.Body.Close() }() @@ -127,7 +127,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri return io.Copy(w, resp.Body) } -func NewChunkedFileReader(chunkList []*ChunkInfo, master string, grpcDialOption grpc.DialOption) *ChunkedFileReader { +func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader { var totalSize int64 for _, chunk := range chunkList { totalSize += chunk.Size @@ -176,7 +176,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < len(cf.chunkList); chunkIndex++ { ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, jwt, lookupError := LookupFileId(func() string { + fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress { return cf.master }, cf.grpcDialOption, ci.Fid) if lookupError != nil { diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 15d07a52e..587cf1d01 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "google.golang.org/grpc" "net/http" "strings" @@ -74,22 +75,23 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str return ret, err } - server_to_fileIds := make(map[string][]string) + server_to_fileIds := make(map[pb.ServerAddress][]string) for vid, result := range lookupResults { if result.Error != "" { ret = append(ret, &volume_server_pb.DeleteResult{ FileId: vid, Status: http.StatusBadRequest, - Error: err.Error()}, + Error: result.Error}, ) continue } for _, location := range result.Locations { - if _, ok := server_to_fileIds[location.Url]; !ok { - server_to_fileIds[location.Url] = make([]string, 0) + serverAddress := location.ServerAddress() + if _, ok := server_to_fileIds[serverAddress]; !ok { + server_to_fileIds[serverAddress] = make([]string, 0) } - server_to_fileIds[location.Url] = append( - server_to_fileIds[location.Url], vid_to_fileIds[vid]...) + server_to_fileIds[serverAddress] = append( + server_to_fileIds[serverAddress], vid_to_fileIds[vid]...) } } @@ -97,7 +99,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str var wg sync.WaitGroup for server, fidList := range server_to_fileIds { wg.Add(1) - go func(server string, fidList []string) { + go func(server pb.ServerAddress, fidList []string) { defer wg.Done() if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil { @@ -119,9 +121,9 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { +func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 025a65b38..9b68d2286 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -1,67 +1,27 @@ package operation import ( - "fmt" - "strconv" - "strings" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - - grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) - if err != nil { - return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err) - } +func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, grpcAddress, grpcDialOption) - -} + }, volumeServer.ToGrpcAddress(), grpcDialOption) -func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) { - sepIndex := strings.LastIndex(volumeServer, ":") - port, err := strconv.Atoi(volumeServer[sepIndex+1:]) - if err != nil { - glog.Errorf("failed to parse volume server address: %v", volumeServer) - return "", err - } - return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } -func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - - masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer) - if parseErr != nil { - return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr) - } +func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterGrpcAddress, grpcDialOption) - -} - -func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error { - - filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer) - if parseErr != nil { - return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) - } - - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, grpcDialOption) + }, masterServer.ToGrpcAddress(), grpcDialOption) } diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 8717f6b36..1eb5dd320 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "google.golang.org/grpc" "math/rand" "strings" @@ -15,7 +16,13 @@ import ( type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` + GrpcPort int `json:"grpcPort,omitempty"` } + +func (l *Location) ServerAddress() pb.ServerAddress { + return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort) +} + type LookupResult struct { VolumeOrFileId string `json:"volumeOrFileId,omitempty"` Locations []Location `json:"locations,omitempty"` @@ -72,7 +79,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids //only query unknown_vids - err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeOrFileIds: unknown_vids, @@ -89,6 +96,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids locations = append(locations, Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, + GrpcPort: int(loc.GrpcPort), }) } if vidLocations.Error != "" { diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index d7e8a4162..2b44b3b26 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -53,7 +53,16 @@ func TestCreateNeedleFromRequest(t *testing.T) { assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip") fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize) } - uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "") + uploadOption := &UploadOption{ + UploadUrl: "http://localhost:8080/389,0f084d17353afda0", + Filename: "t.txt", + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: "", + } + uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption) if len(data) != len(textContent) { t.Errorf("data actual %d expected %d", len(data), len(textContent)) } @@ -72,7 +81,16 @@ func TestCreateNeedleFromRequest(t *testing.T) { fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) } gzippedData, _ := util.GzipData([]byte(textContent)) - Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "") + uploadOption := &UploadOption{ + UploadUrl: "http://localhost:8080/389,0f084d17353afda0", + Filename: "t.txt", + Cipher: false, + IsInputCompressed: true, + MimeType: "text/plain", + PairMap: nil, + Jwt: "", + } + Upload(bytes.NewReader(gzippedData), uploadOption) } /* diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 87c5e4279..648df174a 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,6 +1,7 @@ package operation import ( + "github.com/chrislusf/seaweedfs/weed/pb" "io" "mime" "net/url" @@ -39,7 +40,7 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -type GetMasterFn func() string +type GetMasterFn func() pb.ServerAddress func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) @@ -206,7 +207,16 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) } } else { - ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) + uploadOption := &UploadOption{ + UploadUrl: fileUrl, + Filename: baseName, + Cipher: false, + IsInputCompressed: false, + MimeType: fi.MimeType, + PairMap: nil, + Jwt: jwt, + } + ret, e, _ := Upload(fi.Reader, uploadOption) if e != nil { return 0, e } @@ -219,7 +229,16 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) + uploadOption := &UploadOption{ + UploadUrl: fileUrl, + Filename: filename, + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: jwt, + } + uploadResult, uploadError, _ := Upload(reader, uploadOption) if uploadError != nil { return 0, uploadError } @@ -236,6 +255,15 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s q := u.Query() q.Set("cm", "true") u.RawQuery = q.Encode() - _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt) + uploadOption := &UploadOption{ + UploadUrl: u.String(), + Filename: manifest.Name, + Cipher: false, + IsInputCompressed: false, + MimeType: "application/json", + PairMap: nil, + Jwt: jwt, + } + _, e = UploadData(buf, uploadOption) return e } diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index 5562f12ab..de71a198d 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -2,13 +2,14 @@ package operation import ( "context" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "google.golang.org/grpc" ) -func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { +func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index e3f2c0664..d3449873b 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -3,6 +3,7 @@ package operation import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "google.golang.org/grpc" @@ -21,13 +22,13 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle return fmt.Errorf("unable to locate volume %d", vid) } - volumeServer := lookup.Locations[0].Url + volumeServer := lookup.Locations[0].ServerAddress() return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn) } -func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { +func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { + return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 8e7c6f733..3d41d2eb5 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -4,22 +4,31 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" "io" - "io/ioutil" "mime" "mime/multipart" + "net" "net/http" "net/textproto" "path/filepath" "strings" "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" ) +type UploadOption struct { + UploadUrl string + Filename string + Cipher bool + IsInputCompressed bool + MimeType string + PairMap map[string]string + Jwt security.EncodedJwt +} + type UploadResult struct { Name string `json:"name,omitempty"` Size uint32 `json:"size,omitempty"` @@ -57,68 +66,72 @@ var ( func init() { HttpClient = &http.Client{Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 10 * time.Second, + }).DialContext, MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, }} } -var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) +var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { - uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) +func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = retriedUploadData(data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) +func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = doUpload(reader, option) return } -func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { +func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes } else { - data, err = ioutil.ReadAll(reader) + data, err = io.ReadAll(reader) if err != nil { err = fmt.Errorf("read input: %v", err) return } } - uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + uploadResult, uploadErr := retriedUploadData(data, option) return uploadResult, uploadErr, data } -func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { +func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { - uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + uploadResult, err = doUploadData(data, option) if err == nil { uploadResult.RetryCount = i return } else { - glog.Warningf("uploading to %s: %v", uploadUrl, err) + glog.Warningf("uploading to %s: %v", option.UploadUrl, err) } time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } return } -func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { - contentIsGzipped := isInputCompressed +func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + contentIsGzipped := option.IsInputCompressed shouldGzipNow := false - if !isInputCompressed { - if mtype == "" { - mtype = http.DetectContentType(data) - // println("detect1 mimetype to", mtype) - if mtype == "application/octet-stream" { - mtype = "" + if !option.IsInputCompressed { + if option.MimeType == "" { + option.MimeType = http.DetectContentType(data) + // println("detect1 mimetype to", MimeType) + if option.MimeType == "application/octet-stream" { + option.MimeType = "" } } - if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed { + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(option.Filename), option.MimeType); iAmSure && shouldBeCompressed { shouldGzipNow = true - } else if !iAmSure && mtype == "" && len(data) > 16*1024 { + } else if !iAmSure && option.MimeType == "" && len(data) > 16*1024 { var compressed []byte compressed, err = util.GzipData(data[0:128]) shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90% @@ -131,14 +144,14 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i // this could be double copying clearDataLen = len(data) clearData := data - if shouldGzipNow && !cipher { + if shouldGzipNow && !option.Cipher { compressed, compressErr := util.GzipData(data) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) if compressErr == nil { data = compressed contentIsGzipped = true } - } else if isInputCompressed { + } else if option.IsInputCompressed { // just to get the clear data length clearData, err = util.DecompressData(data) if err == nil { @@ -146,7 +159,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i } } - if cipher { + if option.Cipher { // encrypt(gzip(data)) // encrypt @@ -158,23 +171,39 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i } // upload data - uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + uploadResult, err = upload_content(func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return - }, "", false, len(encryptedData), "", nil, jwt) + }, len(encryptedData), &UploadOption{ + UploadUrl: option.UploadUrl, + Filename: "", + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: option.Jwt, + }) if uploadResult == nil { return } - uploadResult.Name = filename - uploadResult.Mime = mtype + uploadResult.Name = option.Filename + uploadResult.Mime = option.MimeType uploadResult.CipherKey = cipherKey uploadResult.Size = uint32(clearDataLen) } else { // upload data - uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + uploadResult, err = upload_content(func(w io.Writer) (err error) { _, err = w.Write(data) return - }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) + }, len(data), &UploadOption{ + UploadUrl: option.UploadUrl, + Filename: option.Filename, + Cipher: false, + IsInputCompressed: contentIsGzipped, + MimeType: option.MimeType, + PairMap: option.PairMap, + Jwt: option.Jwt, + }) if uploadResult == nil { return } @@ -187,20 +216,21 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i return uploadResult, err } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { +func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { buf := GetBuffer() defer PutBuffer(buf) body_writer := multipart.NewWriter(buf) h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) - h.Set("Idempotency-Key", uploadUrl) - if mtype == "" { - mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) + filename := fileNameEscaper.Replace(option.Filename) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename)) + h.Set("Idempotency-Key", option.UploadUrl) + if option.MimeType == "" { + option.MimeType = mime.TypeByExtension(strings.ToLower(filepath.Ext(option.Filename))) } - if mtype != "" { - h.Set("Content-Type", mtype) + if option.MimeType != "" { + h.Set("Content-Type", option.MimeType) } - if isGzipped { + if option.IsInputCompressed { h.Set("Content-Encoding", "gzip") } @@ -219,28 +249,29 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return nil, err } - req, postErr := http.NewRequest("POST", uploadUrl, bytes.NewReader(buf.Bytes())) + req, postErr := http.NewRequest("POST", option.UploadUrl, bytes.NewReader(buf.Bytes())) if postErr != nil { - glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr) - return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr) + glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr) + return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr) } req.Header.Set("Content-Type", content_type) - for k, v := range pairMap { + for k, v := range option.PairMap { req.Header.Set(k, v) } - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) + if option.Jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } // print("+") resp, post_err := HttpClient.Do(req) if post_err != nil { if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { + glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) resp, post_err = HttpClient.Do(req) } } if post_err != nil { - return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err) + return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err) } // print("-") defer util.CloseResponse(resp) @@ -252,18 +283,18 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return &ret, nil } - resp_body, ra_err := ioutil.ReadAll(resp.Body) + resp_body, ra_err := io.ReadAll(resp.Body) if ra_err != nil { - return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err) + return nil, fmt.Errorf("read response body %v: %v", option.UploadUrl, ra_err) } unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body)) - return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err) + glog.Errorf("unmarshal %s: %v", option.UploadUrl, string(resp_body)) + return nil, fmt.Errorf("unmarshal %v: %v", option.UploadUrl, unmarshal_err) } if ret.Error != "" { - return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error) + return nil, fmt.Errorf("unmarshalled error %v: %v", option.UploadUrl, ret.Error) } ret.ETag = etag ret.ContentMd5 = resp.Header.Get("Content-MD5") |
