diff options
Diffstat (limited to 'weed/shell/command_mq_topic_compact.go')
| -rw-r--r-- | weed/shell/command_mq_topic_compact.go | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/weed/shell/command_mq_topic_compact.go b/weed/shell/command_mq_topic_compact.go index f1dee8662..79d8a45f8 100644 --- a/weed/shell/command_mq_topic_compact.go +++ b/weed/shell/command_mq_topic_compact.go @@ -2,15 +2,16 @@ package shell import ( "flag" + "io" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer_client" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" - "google.golang.org/grpc" - "io" - "time" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) func init() { @@ -63,22 +64,22 @@ func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer } // read topic configuration - fca := &filer_client.FilerClientAccessor{ - GetFiler: func() pb.ServerAddress { - return commandEnv.option.FilerAddress - }, - GetGrpcDialOption: func() grpc.DialOption { - return commandEnv.option.GrpcDialOption - }, - } + fca := filer_client.NewFilerClientAccessor( + []pb.ServerAddress{commandEnv.option.FilerAddress}, + commandEnv.option.GrpcDialOption, + ) t := topic.NewTopic(*namespace, *topicName) topicConf, err := fca.ReadTopicConfFromFiler(t) if err != nil { return err } - // get record type - recordType := topicConf.GetRecordType() + // get record type - prefer flat schema if available + var recordType *schema_pb.RecordType + if topicConf.GetMessageRecordType() != nil { + // New flat schema format - use directly + recordType = topicConf.GetMessageRecordType() + } recordType = schema.NewRecordTypeBuilder(recordType). WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64). WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes). |
