aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_mq_topic_compact.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_mq_topic_compact.go')
-rw-r--r--weed/shell/command_mq_topic_compact.go27
1 files changed, 14 insertions, 13 deletions
diff --git a/weed/shell/command_mq_topic_compact.go b/weed/shell/command_mq_topic_compact.go
index f1dee8662..79d8a45f8 100644
--- a/weed/shell/command_mq_topic_compact.go
+++ b/weed/shell/command_mq_topic_compact.go
@@ -2,15 +2,16 @@ package shell
import (
"flag"
+ "io"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/filer_client"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
- "google.golang.org/grpc"
- "io"
- "time"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func init() {
@@ -63,22 +64,22 @@ func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer
}
// read topic configuration
- fca := &filer_client.FilerClientAccessor{
- GetFiler: func() pb.ServerAddress {
- return commandEnv.option.FilerAddress
- },
- GetGrpcDialOption: func() grpc.DialOption {
- return commandEnv.option.GrpcDialOption
- },
- }
+ fca := filer_client.NewFilerClientAccessor(
+ []pb.ServerAddress{commandEnv.option.FilerAddress},
+ commandEnv.option.GrpcDialOption,
+ )
t := topic.NewTopic(*namespace, *topicName)
topicConf, err := fca.ReadTopicConfFromFiler(t)
if err != nil {
return err
}
- // get record type
- recordType := topicConf.GetRecordType()
+ // get record type - prefer flat schema if available
+ var recordType *schema_pb.RecordType
+ if topicConf.GetMessageRecordType() != nil {
+ // New flat schema format - use directly
+ recordType = topicConf.GetMessageRecordType()
+ }
recordType = schema.NewRecordTypeBuilder(recordType).
WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64).
WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes).