aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2024-11-04 12:08:25 -0800
committerGitHub <noreply@github.com>2024-11-04 12:08:25 -0800
commitdc784bf217172b0e81eb4b3e5eb0e0e38b91849a (patch)
treee0795c9a6335571ca852bd84e720eb42d7b3538f /weed/shell
parentffe908371d4ddbc0cacbe8362a28bf56b322b349 (diff)
downloadseaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.tar.xz
seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.zip
merge current message queue code changes (#6201)
* listing files to convert to parquet * write parquet files * save logs into parquet files * pass by value * compact logs into parquet format * can skip existing files * refactor * refactor * fix compilation * when no partition found * refactor * add untested parquet file read * rename package * refactor * rename files * remove unused * add merged log read func * parquet wants to know the file size * rewind by time * pass in stop ts * add stop ts * adjust log * minor * adjust log * skip .parquet files when reading message logs * skip non message files * Update subscriber_record.go * send messages * skip message data with only ts * skip non log files * update parquet-go package * ensure a valid record type * add new field to a record type * Update read_parquet_to_log.go * fix parquet file name generation * separating reading parquet and logs * add key field * add skipped logs * use in memory cache * refactor * refactor * refactor * refactor, and change compact log * refactor * rename * refactor * fix format * prefix v to version directory
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_mq_topic_compact.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/weed/shell/command_mq_topic_compact.go b/weed/shell/command_mq_topic_compact.go
new file mode 100644
index 000000000..f1dee8662
--- /dev/null
+++ b/weed/shell/command_mq_topic_compact.go
@@ -0,0 +1,94 @@
+package shell
+
+import (
+ "flag"
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+func init() {
+ Commands = append(Commands, &commandMqTopicCompact{})
+}
+
+type commandMqTopicCompact struct {
+}
+
+func (c *commandMqTopicCompact) Name() string {
+ return "mq.topic.compact"
+}
+
+func (c *commandMqTopicCompact) Help() string {
+ return `compact the topic storage into parquet format
+
+ Example:
+ mq.topic.compact -namespace <namespace> -topic <topic_name> -timeAgo <time_ago>
+
+`
+}
+
+func (c *commandMqTopicCompact) HasTag(tag CommandTag) bool {
+ return ResourceHeavy == tag
+}
+
+func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
+
+ // parse parameters
+ mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ namespace := mqCommand.String("namespace", "", "namespace name")
+ topicName := mqCommand.String("topic", "", "topic name")
+ timeAgo := mqCommand.Duration("timeAgo", 2*time.Minute, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ replication := mqCommand.String("replication", "", "replication type")
+ collection := mqCommand.String("collection", "", "optional collection name")
+ dataCenter := mqCommand.String("dataCenter", "", "optional data center name")
+ diskType := mqCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
+ maxMB := mqCommand.Int("maxMB", 4, "split files larger than the limit")
+
+ if err := mqCommand.Parse(args); err != nil {
+ return err
+ }
+
+ storagePreference := &operation.StoragePreference{
+ Replication: *replication,
+ Collection: *collection,
+ DataCenter: *dataCenter,
+ DiskType: *diskType,
+ MaxMB: *maxMB,
+ }
+
+ // read topic configuration
+ fca := &filer_client.FilerClientAccessor{
+ GetFiler: func() pb.ServerAddress {
+ return commandEnv.option.FilerAddress
+ },
+ GetGrpcDialOption: func() grpc.DialOption {
+ return commandEnv.option.GrpcDialOption
+ },
+ }
+ t := topic.NewTopic(*namespace, *topicName)
+ topicConf, err := fca.ReadTopicConfFromFiler(t)
+ if err != nil {
+ return err
+ }
+
+ // get record type
+ recordType := topicConf.GetRecordType()
+ recordType = schema.NewRecordTypeBuilder(recordType).
+ WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64).
+ WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+
+ // compact the topic partition versions
+ if err = logstore.CompactTopicPartitions(commandEnv, t, *timeAgo, recordType, storagePreference); err != nil {
+ return err
+ }
+
+ return nil
+
+}