diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 9 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 8 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 4 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 15 | ||||
| -rw-r--r-- | weed/operation/submit.go | 29 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 4 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 37 |
7 files changed, 62 insertions, 44 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 5fe3462e9..cc1359961 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -18,6 +18,7 @@ type VolumeAssignRequest struct { Replication string Collection string Ttl string + DiskType string DataCenter string Rack string DataNode string @@ -33,7 +34,7 @@ type AssignResult struct { Auth security.EncodedJwt `json:"auth,omitempty"` } -func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -47,13 +48,14 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } - lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, Collection: request.Collection, Ttl: request.Ttl, + DiskType: request.DiskType, DataCenter: request.DataCenter, Rack: request.Rack, DataNode: request.DataNode, @@ -105,6 +107,7 @@ func LookupJwt(master string, fileId string) security.EncodedJwt { type StorageOption struct { Replication string + DiskType string Collection string DataCenter string Rack string @@ -123,6 +126,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a Replication: so.Replication, Collection: so.Collection, Ttl: so.TtlString(), + DiskType: so.DiskType, DataCenter: so.DataCenter, Rack: so.Rack, WritableVolumeCount: so.VolumeGrowthCount, @@ -133,6 +137,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a Replication: so.Replication, Collection: so.Collection, Ttl: so.TtlString(), + DiskType: so.DiskType, DataCenter: "", Rack: "", WritableVolumeCount: so.VolumeGrowthCount, diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 1bac028ff..8506e0518 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { return json.Marshal(cm) } -func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error { +func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error { var fileIds []string for _, ci := range cm.Chunks { fileIds = append(fileIds, ci.Fid) } - results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds) + results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds) if err != nil { glog.V(0).Infof("delete %+v: %v", fileIds, err) return fmt.Errorf("chunk delete: %v", err) @@ -174,7 +174,9 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < len(cf.chunkList); chunkIndex++ { ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.master, ci.Fid) + fileUrl, lookupError := LookupFileId(func() string { + return cf.master + }, ci.Fid) if lookupError != nil { return n, lookupError } diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 65baaddf2..8f87882b1 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -28,10 +28,10 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { } // DeleteFiles batch deletes a list of fileIds -func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { lookupFunc := func(vids []string) (results map[string]LookupResult, err error) { - results, err = LookupVolumeIds(master, grpcDialOption, vids) + results, err = LookupVolumeIds(masterFn, grpcDialOption, vids) if err == nil && usePublicUrl { for _, result := range results { for _, loc := range result.Locations { diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index d0773e7fd..0372e47b0 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -33,10 +33,10 @@ var ( vc VidCache // caching of volume locations, re-check if after 10 minutes ) -func Lookup(server string, vid string) (ret *LookupResult, err error) { +func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) { locations, cache_err := vc.Get(vid) if cache_err != nil { - if ret, err = do_lookup(server, vid); err == nil { + if ret, err = do_lookup(masterFn, vid); err == nil { vc.Set(vid, ret.Locations, 10*time.Minute) } } else { @@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) { return } -func do_lookup(server string, vid string) (*LookupResult, error) { +func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) + server := masterFn() jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) if err != nil { return nil, err @@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) { return &ret, nil } -func LookupFileId(server string, fileId string) (fullUrl string, err error) { +func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) } - lookup, lookupError := Lookup(server, parts[0]) + lookup, lookupError := Lookup(masterFn, parts[0]) if lookupError != nil { return "", lookupError } @@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { } // LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { +func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { ret := make(map[string]LookupResult) var unknown_vids []string @@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin //only query unknown_vids - err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, diff --git a/weed/operation/submit.go b/weed/operation/submit.go index e785b68a9..87c5e4279 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -25,6 +25,7 @@ type FilePart struct { Collection string DataCenter string Ttl string + DiskType string Server string //this comes from assign result Fid string //this comes from assign result, but customizable Fsync bool @@ -38,7 +39,9 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { +type GetMasterFn func() string + +func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -49,8 +52,9 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart Collection: collection, DataCenter: dataCenter, Ttl: ttl, + DiskType: diskType, } - ret, err := Assign(master, grpcDialOption, ar) + ret, err := Assign(masterFn, grpcDialOption, ar) if err != nil { for index := range files { results[index].Error = err.Error() @@ -70,7 +74,8 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart file.Collection = collection file.DataCenter = dataCenter file.Ttl = ttl - results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption) + file.DiskType = diskType + results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -113,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { +func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -143,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur Replication: fi.Replication, Collection: fi.Collection, Ttl: fi.Ttl, + DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { return } @@ -156,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur Replication: fi.Replication, Collection: fi.Collection, Ttl: fi.Ttl, + DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return } id = ret.Fid @@ -177,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur count, e := upload_one_chunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), - master, fileUrl, + masterFn, fileUrl, ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -196,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) } } else { ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) @@ -208,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur return } -func upload_one_chunk(filename string, reader io.Reader, master, +func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index a15c21ae4..045948274 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -11,9 +11,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { +func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { // find volume location, replication, ttl info - lookup, err := Lookup(master, vid.String()) + lookup, err := Lookup(masterFn, vid.String()) if err != nil { return fmt.Errorf("look up volume %d: %v", vid, err) } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index fccc24b16..70428bb07 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -58,11 +58,12 @@ var ( func init() { HttpClient = &http.Client{Transport: &http.Transport{ + MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, }} } -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") +var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) // Upload sends a POST request to a volume server to upload the content with adjustable compression level func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { @@ -99,6 +100,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by } else { glog.Warningf("uploading to %s: %v", uploadUrl, err) } + time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } return } @@ -128,7 +130,8 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i // gzip if possible // this could be double copying clearDataLen = len(data) - if shouldGzipNow { + clearData := data + if shouldGzipNow && !cipher { compressed, compressErr := util.GzipData(data) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) if compressErr == nil { @@ -137,7 +140,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i } } else if isInputCompressed { // just to get the clear data length - clearData, err := util.DecompressData(data) + clearData, err = util.DecompressData(data) if err == nil { clearDataLen = len(clearData) } @@ -148,7 +151,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i // encrypt cipherKey := util.GenCipherKey() - encryptedData, encryptionErr := util.Encrypt(data, cipherKey) + encryptedData, encryptionErr := util.Encrypt(clearData, cipherKey) if encryptionErr != nil { err = fmt.Errorf("encrypt input: %v", encryptionErr) return @@ -159,26 +162,26 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i _, err = w.Write(encryptedData) return }, "", false, len(encryptedData), "", nil, jwt) - if uploadResult != nil { - uploadResult.Name = filename - uploadResult.Mime = mtype - uploadResult.CipherKey = cipherKey + if uploadResult == nil { + return } + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + uploadResult.Size = uint32(clearDataLen) } else { // upload data uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { _, err = w.Write(data) return }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) - } - - if uploadResult == nil { - return - } - - uploadResult.Size = uint32(clearDataLen) - if contentIsGzipped { - uploadResult.Gzip = 1 + if uploadResult == nil { + return + } + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } } return uploadResult, err |
