aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_mq_topic_compact.go
blob: 79d8a45f814f67d3d4f887954a3f045864537d47 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package shell

import (
	"flag"
	"io"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/filer_client"
	"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
	"github.com/seaweedfs/seaweedfs/weed/mq/schema"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/operation"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

func init() {
	Commands = append(Commands, &commandMqTopicCompact{})
}

type commandMqTopicCompact struct {
}

func (c *commandMqTopicCompact) Name() string {
	return "mq.topic.compact"
}

func (c *commandMqTopicCompact) Help() string {
	return `compact the topic storage into parquet format

	Example:
		mq.topic.compact -namespace <namespace> -topic <topic_name> -timeAgo <time_ago>

`
}

func (c *commandMqTopicCompact) HasTag(tag CommandTag) bool {
	return ResourceHeavy == tag
}

func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {

	// parse parameters
	mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
	namespace := mqCommand.String("namespace", "", "namespace name")
	topicName := mqCommand.String("topic", "", "topic name")
	timeAgo := mqCommand.Duration("timeAgo", 2*time.Minute, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
	replication := mqCommand.String("replication", "", "replication type")
	collection := mqCommand.String("collection", "", "optional collection name")
	dataCenter := mqCommand.String("dataCenter", "", "optional data center name")
	diskType := mqCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
	maxMB := mqCommand.Int("maxMB", 4, "split files larger than the limit")

	if err := mqCommand.Parse(args); err != nil {
		return err
	}

	storagePreference := &operation.StoragePreference{
		Replication: *replication,
		Collection:  *collection,
		DataCenter:  *dataCenter,
		DiskType:    *diskType,
		MaxMB:       *maxMB,
	}

	// read topic configuration
	fca := filer_client.NewFilerClientAccessor(
		[]pb.ServerAddress{commandEnv.option.FilerAddress},
		commandEnv.option.GrpcDialOption,
	)
	t := topic.NewTopic(*namespace, *topicName)
	topicConf, err := fca.ReadTopicConfFromFiler(t)
	if err != nil {
		return err
	}

	// get record type - prefer flat schema if available
	var recordType *schema_pb.RecordType
	if topicConf.GetMessageRecordType() != nil {
		// New flat schema format - use directly
		recordType = topicConf.GetMessageRecordType()
	}
	recordType = schema.NewRecordTypeBuilder(recordType).
		WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64).
		WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes).
		RecordTypeEnd()

	// compact the topic partition versions
	if err = logstore.CompactTopicPartitions(commandEnv, t, *timeAgo, recordType, storagePreference); err != nil {
		return err
	}

	return nil

}