aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/flat_schema_utils.go
blob: 93a241cecef1d14e411d453f0dce38fbf25f218f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package schema

import (
	"fmt"
	"sort"
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

// SplitFlatSchemaToKeyValue takes a flat RecordType and key column names,
// returns separate key and value RecordTypes
func SplitFlatSchemaToKeyValue(flatSchema *schema_pb.RecordType, keyColumns []string) (*schema_pb.RecordType, *schema_pb.RecordType, error) {
	if flatSchema == nil {
		return nil, nil, nil
	}

	// Create maps for fast lookup
	keyColumnSet := make(map[string]bool)
	for _, col := range keyColumns {
		keyColumnSet[col] = true
	}

	var keyFields []*schema_pb.Field
	var valueFields []*schema_pb.Field

	// Split fields based on key columns
	for _, field := range flatSchema.Fields {
		if keyColumnSet[field.Name] {
			// Create key field with reindexed field index
			keyField := &schema_pb.Field{
				Name:       field.Name,
				FieldIndex: int32(len(keyFields)),
				Type:       field.Type,
				IsRepeated: field.IsRepeated,
				IsRequired: field.IsRequired,
			}
			keyFields = append(keyFields, keyField)
		} else {
			// Create value field with reindexed field index
			valueField := &schema_pb.Field{
				Name:       field.Name,
				FieldIndex: int32(len(valueFields)),
				Type:       field.Type,
				IsRepeated: field.IsRepeated,
				IsRequired: field.IsRequired,
			}
			valueFields = append(valueFields, valueField)
		}
	}

	// Validate that all key columns were found
	if len(keyFields) != len(keyColumns) {
		missingCols := []string{}
		for _, col := range keyColumns {
			found := false
			for _, field := range keyFields {
				if field.Name == col {
					found = true
					break
				}
			}
			if !found {
				missingCols = append(missingCols, col)
			}
		}
		if len(missingCols) > 0 {
			return nil, nil, fmt.Errorf("key columns not found in schema: %v", missingCols)
		}
	}

	var keyRecordType *schema_pb.RecordType
	if len(keyFields) > 0 {
		keyRecordType = &schema_pb.RecordType{Fields: keyFields}
	}

	var valueRecordType *schema_pb.RecordType
	if len(valueFields) > 0 {
		valueRecordType = &schema_pb.RecordType{Fields: valueFields}
	}

	return keyRecordType, valueRecordType, nil
}

// CombineFlatSchemaFromKeyValue creates a flat RecordType by combining key and value schemas
// Key fields are placed first, then value fields
func CombineFlatSchemaFromKeyValue(keySchema *schema_pb.RecordType, valueSchema *schema_pb.RecordType) (*schema_pb.RecordType, []string) {
	var combinedFields []*schema_pb.Field
	var keyColumns []string

	// Add key fields first
	if keySchema != nil {
		for _, field := range keySchema.Fields {
			combinedField := &schema_pb.Field{
				Name:       field.Name,
				FieldIndex: int32(len(combinedFields)),
				Type:       field.Type,
				IsRepeated: field.IsRepeated,
				IsRequired: field.IsRequired,
			}
			combinedFields = append(combinedFields, combinedField)
			keyColumns = append(keyColumns, field.Name)
		}
	}

	// Add value fields
	if valueSchema != nil {
		for _, field := range valueSchema.Fields {
			// Check for name conflicts
			fieldName := field.Name
			for _, keyCol := range keyColumns {
				if fieldName == keyCol {
					// This shouldn't happen in well-formed schemas, but handle gracefully
					fieldName = "value_" + fieldName
					break
				}
			}

			combinedField := &schema_pb.Field{
				Name:       fieldName,
				FieldIndex: int32(len(combinedFields)),
				Type:       field.Type,
				IsRepeated: field.IsRepeated,
				IsRequired: field.IsRequired,
			}
			combinedFields = append(combinedFields, combinedField)
		}
	}

	if len(combinedFields) == 0 {
		return nil, keyColumns
	}

	return &schema_pb.RecordType{Fields: combinedFields}, keyColumns
}

// ExtractKeyColumnsFromCombinedSchema tries to infer key columns from a combined schema
// that was created using CreateCombinedRecordType (with key_ prefixes)
func ExtractKeyColumnsFromCombinedSchema(combinedSchema *schema_pb.RecordType) (flatSchema *schema_pb.RecordType, keyColumns []string) {
	if combinedSchema == nil {
		return nil, nil
	}

	var flatFields []*schema_pb.Field
	var keyColumns_ []string

	for _, field := range combinedSchema.Fields {
		if strings.HasPrefix(field.Name, "key_") {
			// This is a key field - remove the prefix
			originalName := strings.TrimPrefix(field.Name, "key_")
			flatField := &schema_pb.Field{
				Name:       originalName,
				FieldIndex: int32(len(flatFields)),
				Type:       field.Type,
				IsRepeated: field.IsRepeated,
				IsRequired: field.IsRequired,
			}
			flatFields = append(flatFields, flatField)
			keyColumns_ = append(keyColumns_, originalName)
		} else {
			// This is a value field
			flatField := &schema_pb.Field{
				Name:       field.Name,
				FieldIndex: int32(len(flatFields)),
				Type:       field.Type,
				IsRepeated: field.IsRepeated,
				IsRequired: field.IsRequired,
			}
			flatFields = append(flatFields, flatField)
		}
	}

	// Sort key columns to ensure deterministic order
	sort.Strings(keyColumns_)

	if len(flatFields) == 0 {
		return nil, keyColumns_
	}

	return &schema_pb.RecordType{Fields: flatFields}, keyColumns_
}

// ValidateKeyColumns checks that all key columns exist in the schema
func ValidateKeyColumns(schema *schema_pb.RecordType, keyColumns []string) error {
	if schema == nil || len(keyColumns) == 0 {
		return nil
	}

	fieldNames := make(map[string]bool)
	for _, field := range schema.Fields {
		fieldNames[field.Name] = true
	}

	var missingColumns []string
	for _, keyCol := range keyColumns {
		if !fieldNames[keyCol] {
			missingColumns = append(missingColumns, keyCol)
		}
	}

	if len(missingColumns) > 0 {
		return fmt.Errorf("key columns not found in schema: %v", missingColumns)
	}

	return nil
}