aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/seaweedmq_handler_topics.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/seaweedmq_handler_topics.go')
-rw-r--r--weed/mq/kafka/integration/seaweedmq_handler_topics.go315
1 files changed, 315 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/seaweedmq_handler_topics.go b/weed/mq/kafka/integration/seaweedmq_handler_topics.go
new file mode 100644
index 000000000..b635b40af
--- /dev/null
+++ b/weed/mq/kafka/integration/seaweedmq_handler_topics.go
@@ -0,0 +1,315 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// CreateTopic creates a new topic in both Kafka registry and SeaweedMQ
+func (h *SeaweedMQHandler) CreateTopic(name string, partitions int32) error {
+ return h.CreateTopicWithSchema(name, partitions, nil)
+}
+
+// CreateTopicWithSchema creates a topic with optional value schema
+func (h *SeaweedMQHandler) CreateTopicWithSchema(name string, partitions int32, recordType *schema_pb.RecordType) error {
+ return h.CreateTopicWithSchemas(name, partitions, nil, recordType)
+}
+
+// CreateTopicWithSchemas creates a topic with optional key and value schemas
+func (h *SeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
+ // Check if topic already exists in filer
+ if h.checkTopicInFiler(name) {
+ return fmt.Errorf("topic %s already exists", name)
+ }
+
+ // Create SeaweedMQ topic reference
+ seaweedTopic := &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: name,
+ }
+
+ // Configure topic with SeaweedMQ broker via gRPC
+ if len(h.brokerAddresses) > 0 {
+ brokerAddress := h.brokerAddresses[0] // Use first available broker
+ glog.V(1).Infof("Configuring topic %s with broker %s", name, brokerAddress)
+
+ // Load security configuration for broker connection
+ util.LoadSecurityConfiguration()
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.mq")
+
+ err := pb.WithBrokerGrpcClient(false, brokerAddress, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ // Convert dual schemas to flat schema format
+ var flatSchema *schema_pb.RecordType
+ var keyColumns []string
+ if keyRecordType != nil || valueRecordType != nil {
+ flatSchema, keyColumns = schema.CombineFlatSchemaFromKeyValue(keyRecordType, valueRecordType)
+ }
+
+ _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
+ Topic: seaweedTopic,
+ PartitionCount: partitions,
+ MessageRecordType: flatSchema,
+ KeyColumns: keyColumns,
+ })
+ if err != nil {
+ return fmt.Errorf("configure topic with broker: %w", err)
+ }
+ glog.V(1).Infof("successfully configured topic %s with broker", name)
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("failed to configure topic %s with broker %s: %w", name, brokerAddress, err)
+ }
+ } else {
+ glog.Warningf("No brokers available - creating topic %s in gateway memory only (testing mode)", name)
+ }
+
+ // Topic is now stored in filer only via SeaweedMQ broker
+ // No need to create in-memory topic info structure
+
+ // Offset management now handled directly by SMQ broker - no initialization needed
+
+ // Invalidate cache after successful topic creation
+ h.InvalidateTopicExistsCache(name)
+
+ glog.V(1).Infof("Topic %s created successfully with %d partitions", name, partitions)
+ return nil
+}
+
+// CreateTopicWithRecordType creates a topic with flat schema and key columns
+func (h *SeaweedMQHandler) CreateTopicWithRecordType(name string, partitions int32, flatSchema *schema_pb.RecordType, keyColumns []string) error {
+ // Check if topic already exists in filer
+ if h.checkTopicInFiler(name) {
+ return fmt.Errorf("topic %s already exists", name)
+ }
+
+ // Create SeaweedMQ topic reference
+ seaweedTopic := &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: name,
+ }
+
+ // Configure topic with SeaweedMQ broker via gRPC
+ if len(h.brokerAddresses) > 0 {
+ brokerAddress := h.brokerAddresses[0] // Use first available broker
+ glog.V(1).Infof("Configuring topic %s with broker %s", name, brokerAddress)
+
+ // Load security configuration for broker connection
+ util.LoadSecurityConfiguration()
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.mq")
+
+ err := pb.WithBrokerGrpcClient(false, brokerAddress, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
+ Topic: seaweedTopic,
+ PartitionCount: partitions,
+ MessageRecordType: flatSchema,
+ KeyColumns: keyColumns,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to configure topic: %w", err)
+ }
+
+ glog.V(1).Infof("successfully configured topic %s with broker", name)
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+ } else {
+ glog.Warningf("No broker addresses configured, topic %s not created in SeaweedMQ", name)
+ }
+
+ // Topic is now stored in filer only via SeaweedMQ broker
+ // No need to create in-memory topic info structure
+
+ glog.V(1).Infof("Topic %s created successfully with %d partitions using flat schema", name, partitions)
+ return nil
+}
+
+// DeleteTopic removes a topic from both Kafka registry and SeaweedMQ
+func (h *SeaweedMQHandler) DeleteTopic(name string) error {
+ // Check if topic exists in filer
+ if !h.checkTopicInFiler(name) {
+ return fmt.Errorf("topic %s does not exist", name)
+ }
+
+ // Get topic info to determine partition count for cleanup
+ topicInfo, exists := h.GetTopicInfo(name)
+ if !exists {
+ return fmt.Errorf("topic %s info not found", name)
+ }
+
+ // Close all publisher sessions for this topic
+ for partitionID := int32(0); partitionID < topicInfo.Partitions; partitionID++ {
+ if h.brokerClient != nil {
+ h.brokerClient.ClosePublisher(name, partitionID)
+ }
+ }
+
+ // Topic removal from filer would be handled by SeaweedMQ broker
+ // No in-memory cache to clean up
+
+ // Offset management handled by SMQ broker - no cleanup needed
+
+ return nil
+}
+
+// TopicExists checks if a topic exists in SeaweedMQ broker (includes in-memory topics)
+// Uses a 5-second cache to reduce broker queries
+func (h *SeaweedMQHandler) TopicExists(name string) bool {
+ // Check cache first
+ h.topicExistsCacheMu.RLock()
+ if entry, found := h.topicExistsCache[name]; found {
+ if time.Now().Before(entry.expiresAt) {
+ h.topicExistsCacheMu.RUnlock()
+ return entry.exists
+ }
+ }
+ h.topicExistsCacheMu.RUnlock()
+
+ // Cache miss or expired - query broker
+
+ var exists bool
+ // Check via SeaweedMQ broker (includes in-memory topics)
+ if h.brokerClient != nil {
+ var err error
+ exists, err = h.brokerClient.TopicExists(name)
+ if err != nil {
+ // Don't cache errors
+ return false
+ }
+ } else {
+ // Return false if broker is unavailable
+ return false
+ }
+
+ // Update cache
+ h.topicExistsCacheMu.Lock()
+ h.topicExistsCache[name] = &topicExistsCacheEntry{
+ exists: exists,
+ expiresAt: time.Now().Add(h.topicExistsCacheTTL),
+ }
+ h.topicExistsCacheMu.Unlock()
+
+ return exists
+}
+
+// InvalidateTopicExistsCache removes a topic from the existence cache
+// Should be called after creating or deleting a topic
+func (h *SeaweedMQHandler) InvalidateTopicExistsCache(name string) {
+ h.topicExistsCacheMu.Lock()
+ delete(h.topicExistsCache, name)
+ h.topicExistsCacheMu.Unlock()
+}
+
+// GetTopicInfo returns information about a topic from broker
+func (h *SeaweedMQHandler) GetTopicInfo(name string) (*KafkaTopicInfo, bool) {
+ // Get topic configuration from broker
+ if h.brokerClient != nil {
+ config, err := h.brokerClient.GetTopicConfiguration(name)
+ if err == nil && config != nil {
+ topicInfo := &KafkaTopicInfo{
+ Name: name,
+ Partitions: config.PartitionCount,
+ CreatedAt: config.CreatedAtNs,
+ }
+ return topicInfo, true
+ }
+ glog.V(2).Infof("Failed to get topic configuration for %s from broker: %v", name, err)
+ }
+
+ // Fallback: check if topic exists in filer (for backward compatibility)
+ if !h.checkTopicInFiler(name) {
+ return nil, false
+ }
+
+ // Return default info if broker query failed but topic exists in filer
+ topicInfo := &KafkaTopicInfo{
+ Name: name,
+ Partitions: 1, // Default to 1 partition if broker query failed
+ CreatedAt: 0,
+ }
+
+ return topicInfo, true
+}
+
+// ListTopics returns all topic names from SeaweedMQ broker (includes in-memory topics)
+func (h *SeaweedMQHandler) ListTopics() []string {
+ // Get topics from SeaweedMQ broker (includes in-memory topics)
+ if h.brokerClient != nil {
+ topics, err := h.brokerClient.ListTopics()
+ if err == nil {
+ return topics
+ }
+ }
+
+ // Return empty list if broker is unavailable
+ return []string{}
+}
+
+// checkTopicInFiler checks if a topic exists in the filer
+func (h *SeaweedMQHandler) checkTopicInFiler(topicName string) bool {
+ if h.filerClientAccessor == nil {
+ return false
+ }
+
+ var exists bool
+ h.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: "/topics/kafka",
+ Name: topicName,
+ }
+
+ _, err := client.LookupDirectoryEntry(context.Background(), request)
+ exists = (err == nil)
+ return nil // Don't propagate error, just check existence
+ })
+
+ return exists
+}
+
+// listTopicsFromFiler lists all topics from the filer
+func (h *SeaweedMQHandler) listTopicsFromFiler() []string {
+ if h.filerClientAccessor == nil {
+ return []string{}
+ }
+
+ var topics []string
+
+ h.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.ListEntriesRequest{
+ Directory: "/topics/kafka",
+ }
+
+ stream, err := client.ListEntries(context.Background(), request)
+ if err != nil {
+ return nil // Don't propagate error, just return empty list
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ break // End of stream or error
+ }
+
+ if resp.Entry != nil && resp.Entry.IsDirectory {
+ topics = append(topics, resp.Entry.Name)
+ } else if resp.Entry != nil {
+ }
+ }
+ return nil
+ })
+
+ return topics
+}