diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2024-11-04 12:08:25 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-04 12:08:25 -0800 |
| commit | dc784bf217172b0e81eb4b3e5eb0e0e38b91849a (patch) | |
| tree | e0795c9a6335571ca852bd84e720eb42d7b3538f /weed/command/upload.go | |
| parent | ffe908371d4ddbc0cacbe8362a28bf56b322b349 (diff) | |
| download | seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.tar.xz seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.zip | |
merge current message queue code changes (#6201)
* listing files to convert to parquet
* write parquet files
* save logs into parquet files
* pass by value
* compact logs into parquet format
* can skip existing files
* refactor
* refactor
* fix compilation
* when no partition found
* refactor
* add untested parquet file read
* rename package
* refactor
* rename files
* remove unused
* add merged log read func
* parquet wants to know the file size
* rewind by time
* pass in stop ts
* add stop ts
* adjust log
* minor
* adjust log
* skip .parquet files when reading message logs
* skip non message files
* Update subscriber_record.go
* send messages
* skip message data with only ts
* skip non log files
* update parquet-go package
* ensure a valid record type
* add new field to a record type
* Update read_parquet_to_log.go
* fix parquet file name generation
* separating reading parquet and logs
* add key field
* add skipped logs
* use in memory cache
* refactor
* refactor
* refactor
* refactor, and change compact log
* refactor
* rename
* refactor
* fix format
* prefix v to version directory
Diffstat (limited to 'weed/command/upload.go')
| -rw-r--r-- | weed/command/upload.go | 18 |
1 files changed, 16 insertions, 2 deletions
diff --git a/weed/command/upload.go b/weed/command/upload.go index 7135a707a..9f9ac1107 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -97,7 +97,14 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, operation.StoragePreference{ + Replication: *upload.replication, + Collection: *upload.collection, + DataCenter: *upload.dataCenter, + Ttl: *upload.ttl, + DiskType: *upload.diskType, + MaxMB: *upload.maxMB, + }, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -119,7 +126,14 @@ func runUpload(cmd *Command, args []string) bool { fmt.Println(e.Error()) return false } - results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, operation.StoragePreference{ + Replication: *upload.replication, + Collection: *upload.collection, + DataCenter: *upload.dataCenter, + Ttl: *upload.ttl, + DiskType: *upload.diskType, + MaxMB: *upload.maxMB, + }, *upload.usePublicUrl) if err != nil { fmt.Println(err.Error()) return false |
