diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2024-11-04 12:08:25 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-11-04 12:08:25 -0800 |
| commit | dc784bf217172b0e81eb4b3e5eb0e0e38b91849a (patch) | |
| tree | e0795c9a6335571ca852bd84e720eb42d7b3538f /weed/shell | |
| parent | ffe908371d4ddbc0cacbe8362a28bf56b322b349 (diff) | |
| download | seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.tar.xz seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.zip | |
merge current message queue code changes (#6201)
* listing files to convert to parquet
* write parquet files
* save logs into parquet files
* pass by value
* compact logs into parquet format
* can skip existing files
* refactor
* refactor
* fix compilation
* when no partition found
* refactor
* add untested parquet file read
* rename package
* refactor
* rename files
* remove unused
* add merged log read func
* parquet wants to know the file size
* rewind by time
* pass in stop ts
* add stop ts
* adjust log
* minor
* adjust log
* skip .parquet files when reading message logs
* skip non message files
* Update subscriber_record.go
* send messages
* skip message data with only ts
* skip non log files
* update parquet-go package
* ensure a valid record type
* add new field to a record type
* Update read_parquet_to_log.go
* fix parquet file name generation
* separating reading parquet and logs
* add key field
* add skipped logs
* use in memory cache
* refactor
* refactor
* refactor
* refactor, and change compact log
* refactor
* rename
* refactor
* fix format
* prefix v to version directory
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_mq_topic_compact.go | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/weed/shell/command_mq_topic_compact.go b/weed/shell/command_mq_topic_compact.go new file mode 100644 index 000000000..f1dee8662 --- /dev/null +++ b/weed/shell/command_mq_topic_compact.go @@ -0,0 +1,94 @@ +package shell + +import ( + "flag" + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/mq/logstore" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" + "io" + "time" +) + +func init() { + Commands = append(Commands, &commandMqTopicCompact{}) +} + +type commandMqTopicCompact struct { +} + +func (c *commandMqTopicCompact) Name() string { + return "mq.topic.compact" +} + +func (c *commandMqTopicCompact) Help() string { + return `compact the topic storage into parquet format + + Example: + mq.topic.compact -namespace <namespace> -topic <topic_name> -timeAgo <time_ago> + +` +} + +func (c *commandMqTopicCompact) HasTag(tag CommandTag) bool { + return ResourceHeavy == tag +} + +func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { + + // parse parameters + mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + namespace := mqCommand.String("namespace", "", "namespace name") + topicName := mqCommand.String("topic", "", "topic name") + timeAgo := mqCommand.Duration("timeAgo", 2*time.Minute, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + replication := mqCommand.String("replication", "", "replication type") + collection := mqCommand.String("collection", "", "optional collection name") + dataCenter := mqCommand.String("dataCenter", "", "optional data center name") + diskType := mqCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") + maxMB := mqCommand.Int("maxMB", 4, "split files larger than the limit") + + if err := mqCommand.Parse(args); err != nil { + return err + } + + storagePreference := &operation.StoragePreference{ + Replication: *replication, + Collection: *collection, + DataCenter: *dataCenter, + DiskType: *diskType, + MaxMB: *maxMB, + } + + // read topic configuration + fca := &filer_client.FilerClientAccessor{ + GetFiler: func() pb.ServerAddress { + return commandEnv.option.FilerAddress + }, + GetGrpcDialOption: func() grpc.DialOption { + return commandEnv.option.GrpcDialOption + }, + } + t := topic.NewTopic(*namespace, *topicName) + topicConf, err := fca.ReadTopicConfFromFiler(t) + if err != nil { + return err + } + + // get record type + recordType := topicConf.GetRecordType() + recordType = schema.NewRecordTypeBuilder(recordType). + WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64). + WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + + // compact the topic partition versions + if err = logstore.CompactTopicPartitions(commandEnv, t, *timeAgo, recordType, storagePreference); err != nil { + return err + } + + return nil + +} |
