aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/submit.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/submit.go')
-rw-r--r--weed/operation/submit.go80
1 files changed, 41 insertions, 39 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 73e50cc48..9470afced 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -19,19 +19,15 @@ import (
)
type FilePart struct {
- Reader io.Reader
- FileName string
- FileSize int64
- MimeType string
- ModTime int64 //in seconds
- Replication string
- Collection string
- DataCenter string
- Ttl string
- DiskType string
- Server string //this comes from assign result
- Fid string //this comes from assign result, but customizable
- Fsync bool
+ Reader io.Reader
+ FileName string
+ FileSize int64
+ MimeType string
+ ModTime int64 //in seconds
+ Pref StoragePreference
+ Server string //this comes from assign result
+ Fid string //this comes from assign result, but customizable
+ Fsync bool
}
type SubmitResult struct {
@@ -42,20 +38,29 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
+type StoragePreference struct {
+ Replication string
+ Collection string
+ DataCenter string
+ Ttl string
+ DiskType string
+ MaxMB int
+}
+
type GetMasterFn func(ctx context.Context) pb.ServerAddress
-func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
+func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
}
ar := &VolumeAssignRequest{
Count: uint64(len(files)),
- Replication: replication,
- Collection: collection,
- DataCenter: dataCenter,
- Ttl: ttl,
- DiskType: diskType,
+ Replication: pref.Replication,
+ Collection: pref.Collection,
+ DataCenter: pref.DataCenter,
+ Ttl: pref.Ttl,
+ DiskType: pref.DiskType,
}
ret, err := Assign(masterFn, grpcDialOption, ar)
if err != nil {
@@ -73,12 +78,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F
if usePublicUrl {
file.Server = ret.PublicUrl
}
- file.Replication = replication
- file.Collection = collection
- file.DataCenter = dataCenter
- file.Ttl = ttl
- file.DiskType = diskType
- results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
+ file.Pref = pref
+ results[index].Size, err = file.Upload(pref.MaxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -88,8 +89,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F
return results, nil
}
-func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
- ret = make([]FilePart, len(fullPathFilenames))
+func NewFileParts(fullPathFilenames []string) (ret []*FilePart, err error) {
+ ret = make([]*FilePart, len(fullPathFilenames))
for index, file := range fullPathFilenames {
if ret[index], err = newFilePart(file); err != nil {
return
@@ -97,7 +98,8 @@ func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
}
return
}
-func newFilePart(fullPathFilename string) (ret FilePart, err error) {
+func newFilePart(fullPathFilename string) (ret *FilePart, err error) {
+ ret = &FilePart{}
fh, openErr := os.Open(fullPathFilename)
if openErr != nil {
glog.V(0).Info("Failed to open file: ", fullPathFilename)
@@ -121,7 +123,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
+func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -145,13 +147,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
var ret *AssignResult
var id string
- if fi.DataCenter != "" {
+ if fi.Pref.DataCenter != "" {
ar := &VolumeAssignRequest{
Count: uint64(chunks),
- Replication: fi.Replication,
- Collection: fi.Collection,
- Ttl: fi.Ttl,
- DiskType: fi.DiskType,
+ Replication: fi.Pref.Replication,
+ Collection: fi.Pref.Collection,
+ Ttl: fi.Pref.Ttl,
+ DiskType: fi.Pref.DiskType,
}
ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
@@ -159,13 +161,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
}
}
for i := int64(0); i < chunks; i++ {
- if fi.DataCenter == "" {
+ if fi.Pref.DataCenter == "" {
ar := &VolumeAssignRequest{
Count: 1,
- Replication: fi.Replication,
- Collection: fi.Collection,
- Ttl: fi.Ttl,
- DiskType: fi.DiskType,
+ Replication: fi.Pref.Replication,
+ Collection: fi.Pref.Collection,
+ Ttl: fi.Pref.Ttl,
+ DiskType: fi.Pref.DiskType,
}
ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {