diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2024-11-04 12:08:25 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-04 12:08:25 -0800 |
| commit | dc784bf217172b0e81eb4b3e5eb0e0e38b91849a (patch) | |
| tree | e0795c9a6335571ca852bd84e720eb42d7b3538f /weed/operation/submit.go | |
| parent | ffe908371d4ddbc0cacbe8362a28bf56b322b349 (diff) | |
| download | seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.tar.xz seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.zip | |
merge current message queue code changes (#6201)
* listing files to convert to parquet
* write parquet files
* save logs into parquet files
* pass by value
* compact logs into parquet format
* can skip existing files
* refactor
* refactor
* fix compilation
* when no partition found
* refactor
* add untested parquet file read
* rename package
* refactor
* rename files
* remove unused
* add merged log read func
* parquet wants to know the file size
* rewind by time
* pass in stop ts
* add stop ts
* adjust log
* minor
* adjust log
* skip .parquet files when reading message logs
* skip non message files
* Update subscriber_record.go
* send messages
* skip message data with only ts
* skip non log files
* update parquet-go package
* ensure a valid record type
* add new field to a record type
* Update read_parquet_to_log.go
* fix parquet file name generation
* separating reading parquet and logs
* add key field
* add skipped logs
* use in memory cache
* refactor
* refactor
* refactor
* refactor, and change compact log
* refactor
* rename
* refactor
* fix format
* prefix v to version directory
Diffstat (limited to 'weed/operation/submit.go')
| -rw-r--r-- | weed/operation/submit.go | 80 |
1 files changed, 41 insertions, 39 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 73e50cc48..9470afced 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -19,19 +19,15 @@ import ( ) type FilePart struct { - Reader io.Reader - FileName string - FileSize int64 - MimeType string - ModTime int64 //in seconds - Replication string - Collection string - DataCenter string - Ttl string - DiskType string - Server string //this comes from assign result - Fid string //this comes from assign result, but customizable - Fsync bool + Reader io.Reader + FileName string + FileSize int64 + MimeType string + ModTime int64 //in seconds + Pref StoragePreference + Server string //this comes from assign result + Fid string //this comes from assign result, but customizable + Fsync bool } type SubmitResult struct { @@ -42,20 +38,29 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } +type StoragePreference struct { + Replication string + Collection string + DataCenter string + Ttl string + DiskType string + MaxMB int +} + type GetMasterFn func(ctx context.Context) pb.ServerAddress -func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { +func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName } ar := &VolumeAssignRequest{ Count: uint64(len(files)), - Replication: replication, - Collection: collection, - DataCenter: dataCenter, - Ttl: ttl, - DiskType: diskType, + Replication: pref.Replication, + Collection: pref.Collection, + DataCenter: pref.DataCenter, + Ttl: pref.Ttl, + DiskType: pref.DiskType, } ret, err := Assign(masterFn, grpcDialOption, ar) if err != nil { @@ -73,12 +78,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F if usePublicUrl { file.Server = ret.PublicUrl } - file.Replication = replication - file.Collection = collection - file.DataCenter = dataCenter - file.Ttl = ttl - file.DiskType = diskType - results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) + file.Pref = pref + results[index].Size, err = file.Upload(pref.MaxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -88,8 +89,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F return results, nil } -func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { - ret = make([]FilePart, len(fullPathFilenames)) +func NewFileParts(fullPathFilenames []string) (ret []*FilePart, err error) { + ret = make([]*FilePart, len(fullPathFilenames)) for index, file := range fullPathFilenames { if ret[index], err = newFilePart(file); err != nil { return @@ -97,7 +98,8 @@ func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) { } return } -func newFilePart(fullPathFilename string) (ret FilePart, err error) { +func newFilePart(fullPathFilename string) (ret *FilePart, err error) { + ret = &FilePart{} fh, openErr := os.Open(fullPathFilename) if openErr != nil { glog.V(0).Info("Failed to open file: ", fullPathFilename) @@ -121,7 +123,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { +func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -145,13 +147,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw var ret *AssignResult var id string - if fi.DataCenter != "" { + if fi.Pref.DataCenter != "" { ar := &VolumeAssignRequest{ Count: uint64(chunks), - Replication: fi.Replication, - Collection: fi.Collection, - Ttl: fi.Ttl, - DiskType: fi.DiskType, + Replication: fi.Pref.Replication, + Collection: fi.Pref.Collection, + Ttl: fi.Pref.Ttl, + DiskType: fi.Pref.DiskType, } ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { @@ -159,13 +161,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw } } for i := int64(0); i < chunks; i++ { - if fi.DataCenter == "" { + if fi.Pref.DataCenter == "" { ar := &VolumeAssignRequest{ Count: 1, - Replication: fi.Replication, - Collection: fi.Collection, - Ttl: fi.Ttl, - DiskType: fi.DiskType, + Replication: fi.Pref.Replication, + Collection: fi.Pref.Collection, + Ttl: fi.Pref.Ttl, + DiskType: fi.Pref.DiskType, } ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { |
