aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/topic.go
blob: 6fb0f0ce9c2a8bf37a38f8bc65d8595006838d8d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package topic

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	jsonpb "google.golang.org/protobuf/encoding/protojson"
)

type Topic struct {
	Namespace string
	Name      string
}

func NewTopic(namespace string, name string) Topic {
	return Topic{
		Namespace: namespace,
		Name:      name,
	}
}
func FromPbTopic(topic *schema_pb.Topic) Topic {
	return Topic{
		Namespace: topic.Namespace,
		Name:      topic.Name,
	}
}

func (t Topic) ToPbTopic() *schema_pb.Topic {
	return &schema_pb.Topic{
		Namespace: t.Namespace,
		Name:      t.Name,
	}
}

func (t Topic) String() string {
	return fmt.Sprintf("%s.%s", t.Namespace, t.Name)
}

func (t Topic) Dir() string {
	return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
}

func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, error) {
	data, err := filer.ReadInsideFiler(client, t.Dir(), filer.TopicConfFile)
	if errors.Is(err, filer_pb.ErrNotFound) {
		return nil, err
	}
	if err != nil {
		return nil, fmt.Errorf("read topic.conf of %v: %w", t, err)
	}
	// parse into filer conf object
	conf := &mq_pb.ConfigureTopicResponse{}
	if err = jsonpb.Unmarshal(data, conf); err != nil {
		return nil, fmt.Errorf("unmarshal topic %v conf: %w", t, err)
	}
	return conf, nil
}

// ReadConfFileWithMetadata reads the topic configuration and returns it along with file metadata
func (t Topic) ReadConfFileWithMetadata(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, int64, int64, error) {
	// Use LookupDirectoryEntry to get both content and metadata
	request := &filer_pb.LookupDirectoryEntryRequest{
		Directory: t.Dir(),
		Name:      filer.TopicConfFile,
	}

	resp, err := filer_pb.LookupEntry(context.Background(), client, request)
	if err != nil {
		if errors.Is(err, filer_pb.ErrNotFound) {
			return nil, 0, 0, err
		}
		return nil, 0, 0, fmt.Errorf("lookup topic.conf of %v: %w", t, err)
	}

	// Get file metadata
	var createdAtNs, modifiedAtNs int64
	if resp.Entry.Attributes != nil {
		createdAtNs = resp.Entry.Attributes.Crtime * 1e9 // convert seconds to nanoseconds
		modifiedAtNs = resp.Entry.Attributes.Mtime * 1e9 // convert seconds to nanoseconds
	}

	// Parse the configuration
	conf := &mq_pb.ConfigureTopicResponse{}
	if err = jsonpb.Unmarshal(resp.Entry.Content, conf); err != nil {
		return nil, 0, 0, fmt.Errorf("unmarshal topic %v conf: %w", t, err)
	}

	return conf, createdAtNs, modifiedAtNs, nil
}

func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.ConfigureTopicResponse) error {
	var buf bytes.Buffer
	filer.ProtoToText(&buf, conf)
	if err := filer.SaveInsideFiler(client, t.Dir(), filer.TopicConfFile, buf.Bytes()); err != nil {
		return fmt.Errorf("save topic %v conf: %w", t, err)
	}
	return nil
}

// DiscoverPartitions discovers all partition directories for a topic by scanning the filesystem
// This centralizes partition discovery logic used across query engine, shell commands, etc.
func (t Topic) DiscoverPartitions(ctx context.Context, filerClient filer_pb.FilerClient) ([]string, error) {
	var partitionPaths []string

	// Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34)
	err := filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
		if !versionEntry.IsDirectory {
			return nil // Skip non-directories
		}

		// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
		if !IsValidVersionDirectory(versionEntry.Name) {
			// Skip directories that don't match the version format
			return nil
		}

		// Scan partition directories within this version (e.g., 0000-0630)
		versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name)
		return filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
			if !partitionEntry.IsDirectory {
				return nil // Skip non-directories
			}

			// Parse partition boundary from directory name (e.g., "0000-0630")
			if !IsValidPartitionDirectory(partitionEntry.Name) {
				return nil // Skip invalid partition names
			}

			// Add this partition path to the list
			partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name)
			partitionPaths = append(partitionPaths, partitionPath)
			return nil
		})
	})

	return partitionPaths, err
}

// IsValidVersionDirectory checks if a directory name matches the topic version format
// Format: v2025-09-01-07-16-34
func IsValidVersionDirectory(name string) bool {
	if !strings.HasPrefix(name, "v") || len(name) != 20 {
		return false
	}

	// Try to parse the timestamp part
	timestampStr := name[1:] // Remove 'v' prefix
	_, err := time.Parse("2006-01-02-15-04-05", timestampStr)
	return err == nil
}

// IsValidPartitionDirectory checks if a directory name matches the partition boundary format
// Format: 0000-0630 (rangeStart-rangeStop)
func IsValidPartitionDirectory(name string) bool {
	// Use existing ParsePartitionBoundary function to validate
	start, stop := ParsePartitionBoundary(name)

	// Valid partition ranges should have start < stop (and not both be 0, which indicates parse error)
	return start < stop && start >= 0
}