aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go9
-rw-r--r--weed/operation/chunked_file.go8
-rw-r--r--weed/operation/delete_content.go4
-rw-r--r--weed/operation/lookup.go15
-rw-r--r--weed/operation/submit.go29
-rw-r--r--weed/operation/tail_volume.go4
-rw-r--r--weed/operation/upload_content.go2
7 files changed, 44 insertions, 27 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 5fe3462e9..cc1359961 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -18,6 +18,7 @@ type VolumeAssignRequest struct {
Replication string
Collection string
Ttl string
+ DiskType string
DataCenter string
Rack string
DataNode string
@@ -33,7 +34,7 @@ type AssignResult struct {
Auth security.EncodedJwt `json:"auth,omitempty"`
}
-func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -47,13 +48,14 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
continue
}
- lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
Collection: request.Collection,
Ttl: request.Ttl,
+ DiskType: request.DiskType,
DataCenter: request.DataCenter,
Rack: request.Rack,
DataNode: request.DataNode,
@@ -105,6 +107,7 @@ func LookupJwt(master string, fileId string) security.EncodedJwt {
type StorageOption struct {
Replication string
+ DiskType string
Collection string
DataCenter string
Rack string
@@ -123,6 +126,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
+ DiskType: so.DiskType,
DataCenter: so.DataCenter,
Rack: so.Rack,
WritableVolumeCount: so.VolumeGrowthCount,
@@ -133,6 +137,7 @@ func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, a
Replication: so.Replication,
Collection: so.Collection,
Ttl: so.TtlString(),
+ DiskType: so.DiskType,
DataCenter: "",
Rack: "",
WritableVolumeCount: so.VolumeGrowthCount,
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 1bac028ff..8506e0518 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
+func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds)
+ results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@@ -174,7 +174,9 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, lookupError := LookupFileId(cf.master, ci.Fid)
+ fileUrl, lookupError := LookupFileId(func() string {
+ return cf.master
+ }, ci.Fid)
if lookupError != nil {
return n, lookupError
}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 65baaddf2..8f87882b1 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -28,10 +28,10 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
- results, err = LookupVolumeIds(master, grpcDialOption, vids)
+ results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
if err == nil && usePublicUrl {
for _, result := range results {
for _, loc := range result.Locations {
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index d0773e7fd..0372e47b0 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -33,10 +33,10 @@ var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
-func Lookup(server string, vid string) (ret *LookupResult, err error) {
+func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) {
locations, cache_err := vc.Get(vid)
if cache_err != nil {
- if ret, err = do_lookup(server, vid); err == nil {
+ if ret, err = do_lookup(masterFn, vid); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
} else {
@@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) {
return
}
-func do_lookup(server string, vid string) (*LookupResult, error) {
+func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid)
+ server := masterFn()
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
@@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
return &ret, nil
}
-func LookupFileId(server string, fileId string) (fullUrl string, err error) {
+func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
}
- lookup, lookupError := Lookup(server, parts[0])
+ lookup, lookupError := Lookup(masterFn, parts[0])
if lookupError != nil {
return "", lookupError
}
@@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
+func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
//only query unknown_vids
- err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index e785b68a9..87c5e4279 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -25,6 +25,7 @@ type FilePart struct {
Collection string
DataCenter string
Ttl string
+ DiskType string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
Fsync bool
@@ -38,7 +39,9 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
+type GetMasterFn func() string
+
+func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -49,8 +52,9 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
Collection: collection,
DataCenter: dataCenter,
Ttl: ttl,
+ DiskType: diskType,
}
- ret, err := Assign(master, grpcDialOption, ar)
+ ret, err := Assign(masterFn, grpcDialOption, ar)
if err != nil {
for index := range files {
results[index].Error = err.Error()
@@ -70,7 +74,8 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Collection = collection
file.DataCenter = dataCenter
file.Ttl = ttl
- results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
+ file.DiskType = diskType
+ results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -113,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -143,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
+ DiskType: fi.DiskType,
}
- ret, err = Assign(master, grpcDialOption, ar)
+ ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
return
}
@@ -156,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
+ DiskType: fi.DiskType,
}
- ret, err = Assign(master, grpcDialOption, ar)
+ ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return
}
id = ret.Fid
@@ -177,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
- master, fileUrl,
+ masterFn, fileUrl,
ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -196,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
}
} else {
ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
@@ -208,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
return
}
-func upload_one_chunk(filename string, reader io.Reader, master,
+func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index a15c21ae4..045948274 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -11,9 +11,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
-func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
+func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
// find volume location, replication, ttl info
- lookup, err := Lookup(master, vid.String())
+ lookup, err := Lookup(masterFn, vid.String())
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index fccc24b16..0d5cfd29c 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -58,6 +58,7 @@ var (
func init() {
HttpClient = &http.Client{Transport: &http.Transport{
+ MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
@@ -99,6 +100,7 @@ func retriedUploadData(uploadUrl string, filename string, cipher bool, data []by
} else {
glog.Warningf("uploading to %s: %v", uploadUrl, err)
}
+ time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
return
}