diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 55 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 8 |
2 files changed, 50 insertions, 13 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 893bf516c..5fe3462e9 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -49,14 +50,14 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ - Count: primaryRequest.Count, - Replication: primaryRequest.Replication, - Collection: primaryRequest.Collection, - Ttl: primaryRequest.Ttl, - DataCenter: primaryRequest.DataCenter, - Rack: primaryRequest.Rack, - DataNode: primaryRequest.DataNode, - WritableVolumeCount: primaryRequest.WritableVolumeCount, + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, } resp, grpcErr := masterClient.Assign(context.Background(), req) if grpcErr != nil { @@ -101,3 +102,41 @@ func LookupJwt(master string, fileId string) security.EncodedJwt { return security.EncodedJwt(tokenStr) } + +type StorageOption struct { + Replication string + Collection string + DataCenter string + Rack string + TtlSeconds int32 + Fsync bool + VolumeGrowthCount uint32 +} + +func (so *StorageOption) TtlString() string { + return needle.SecondsToTTL(so.TtlSeconds) +} + +func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) { + ar = &VolumeAssignRequest{ + Count: uint64(count), + Replication: so.Replication, + Collection: so.Collection, + Ttl: so.TtlString(), + DataCenter: so.DataCenter, + Rack: so.Rack, + WritableVolumeCount: so.VolumeGrowthCount, + } + if so.DataCenter != "" || so.Rack != "" { + altRequest = &VolumeAssignRequest{ + Count: uint64(count), + Replication: so.Replication, + Collection: so.Collection, + Ttl: so.TtlString(), + DataCenter: "", + Rack: "", + WritableVolumeCount: so.VolumeGrowthCount, + } + } + return +} diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 3e96b5909..fccc24b16 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -81,14 +81,11 @@ func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, if ok { data = bytesReader.Bytes } else { - buf := bytebufferpool.Get() - _, err = buf.ReadFrom(reader) - defer bytebufferpool.Put(buf) + data, err = ioutil.ReadAll(reader) if err != nil { err = fmt.Errorf("read input: %v", err) return } - data = buf.Bytes() } uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) return uploadResult, uploadErr, data @@ -172,7 +169,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { _, err = w.Write(data) return - }, filename, contentIsGzipped, 0, mtype, pairMap, jwt) + }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) } if uploadResult == nil { @@ -193,6 +190,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error body_writer := multipart.NewWriter(buf) h := make(textproto.MIMEHeader) h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + h.Set("Idempotency-Key", uploadUrl) if mtype == "" { mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) } |
