diff options
Diffstat (limited to 'weed/mq/topic/topic.go')
| -rw-r--r-- | weed/mq/topic/topic.go | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go index 56b9cda5f..6fb0f0ce9 100644 --- a/weed/mq/topic/topic.go +++ b/weed/mq/topic/topic.go @@ -5,11 +5,14 @@ import ( "context" "errors" "fmt" + "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" jsonpb "google.golang.org/protobuf/encoding/protojson" ) @@ -102,3 +105,65 @@ func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.Con } return nil } + +// DiscoverPartitions discovers all partition directories for a topic by scanning the filesystem +// This centralizes partition discovery logic used across query engine, shell commands, etc. +func (t Topic) DiscoverPartitions(ctx context.Context, filerClient filer_pb.FilerClient) ([]string, error) { + var partitionPaths []string + + // Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34) + err := filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { + if !versionEntry.IsDirectory { + return nil // Skip non-directories + } + + // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") + if !IsValidVersionDirectory(versionEntry.Name) { + // Skip directories that don't match the version format + return nil + } + + // Scan partition directories within this version (e.g., 0000-0630) + versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name) + return filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { + if !partitionEntry.IsDirectory { + return nil // Skip non-directories + } + + // Parse partition boundary from directory name (e.g., "0000-0630") + if !IsValidPartitionDirectory(partitionEntry.Name) { + return nil // Skip invalid partition names + } + + // Add this partition path to the list + partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name) + partitionPaths = append(partitionPaths, partitionPath) + return nil + }) + }) + + return partitionPaths, err +} + +// IsValidVersionDirectory checks if a directory name matches the topic version format +// Format: v2025-09-01-07-16-34 +func IsValidVersionDirectory(name string) bool { + if !strings.HasPrefix(name, "v") || len(name) != 20 { + return false + } + + // Try to parse the timestamp part + timestampStr := name[1:] // Remove 'v' prefix + _, err := time.Parse("2006-01-02-15-04-05", timestampStr) + return err == nil +} + +// IsValidPartitionDirectory checks if a directory name matches the partition boundary format +// Format: 0000-0630 (rangeStart-rangeStop) +func IsValidPartitionDirectory(name string) bool { + // Use existing ParsePartitionBoundary function to validate + start, stop := ParsePartitionBoundary(name) + + // Valid partition ranges should have start < stop (and not both be 0, which indicates parse error) + return start < stop && start >= 0 +} |
