aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/topic.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/topic.go')
-rw-r--r--weed/mq/topic/topic.go65
1 files changed, 65 insertions, 0 deletions
diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go
index 56b9cda5f..6fb0f0ce9 100644
--- a/weed/mq/topic/topic.go
+++ b/weed/mq/topic/topic.go
@@ -5,11 +5,14 @@ import (
"context"
"errors"
"fmt"
+ "strings"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
jsonpb "google.golang.org/protobuf/encoding/protojson"
)
@@ -102,3 +105,65 @@ func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.Con
}
return nil
}
+
+// DiscoverPartitions discovers all partition directories for a topic by scanning the filesystem
+// This centralizes partition discovery logic used across query engine, shell commands, etc.
+func (t Topic) DiscoverPartitions(ctx context.Context, filerClient filer_pb.FilerClient) ([]string, error) {
+ var partitionPaths []string
+
+ // Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34)
+ err := filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
+ if !versionEntry.IsDirectory {
+ return nil // Skip non-directories
+ }
+
+ // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
+ if !IsValidVersionDirectory(versionEntry.Name) {
+ // Skip directories that don't match the version format
+ return nil
+ }
+
+ // Scan partition directories within this version (e.g., 0000-0630)
+ versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name)
+ return filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
+ if !partitionEntry.IsDirectory {
+ return nil // Skip non-directories
+ }
+
+ // Parse partition boundary from directory name (e.g., "0000-0630")
+ if !IsValidPartitionDirectory(partitionEntry.Name) {
+ return nil // Skip invalid partition names
+ }
+
+ // Add this partition path to the list
+ partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name)
+ partitionPaths = append(partitionPaths, partitionPath)
+ return nil
+ })
+ })
+
+ return partitionPaths, err
+}
+
+// IsValidVersionDirectory checks if a directory name matches the topic version format
+// Format: v2025-09-01-07-16-34
+func IsValidVersionDirectory(name string) bool {
+ if !strings.HasPrefix(name, "v") || len(name) != 20 {
+ return false
+ }
+
+ // Try to parse the timestamp part
+ timestampStr := name[1:] // Remove 'v' prefix
+ _, err := time.Parse("2006-01-02-15-04-05", timestampStr)
+ return err == nil
+}
+
+// IsValidPartitionDirectory checks if a directory name matches the partition boundary format
+// Format: 0000-0630 (rangeStart-rangeStop)
+func IsValidPartitionDirectory(name string) bool {
+ // Use existing ParsePartitionBoundary function to validate
+ start, stop := ParsePartitionBoundary(name)
+
+ // Valid partition ranges should have start < stop (and not both be 0, which indicates parse error)
+ return start < stop && start >= 0
+}