aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_lookup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_lookup.go')
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go146
1 files changed, 133 insertions, 13 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index d2dfcaa41..680fba87b 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -42,7 +43,10 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
}
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
+ glog.V(4).Infof("📋 ListTopics called, isLockOwner=%v", b.isLockOwner())
+
if !b.isLockOwner() {
+ glog.V(4).Infof("📋 ListTopics proxying to lock owner: %s", b.lockAsBalancer.LockOwner())
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request)
return nil
@@ -53,12 +57,32 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
return resp, err
}
+ glog.V(4).Infof("📋 ListTopics starting - getting in-memory topics")
ret := &mq_pb.ListTopicsResponse{}
- // Scan the filer directory structure to find all topics
+ // First, get topics from in-memory state (includes unflushed topics)
+ inMemoryTopics := b.localTopicManager.ListTopicsInMemory()
+ glog.V(4).Infof("📋 ListTopics found %d in-memory topics", len(inMemoryTopics))
+ topicMap := make(map[string]*schema_pb.Topic)
+
+ // Add in-memory topics to the result
+ for _, topic := range inMemoryTopics {
+ topicMap[topic.String()] = &schema_pb.Topic{
+ Namespace: topic.Namespace,
+ Name: topic.Name,
+ }
+ }
+
+ // Then, scan the filer directory structure to find persisted topics (fallback for topics not in memory)
+ // Use a shorter timeout for filer scanning to ensure Metadata requests remain fast
+ filerCtx, filerCancel := context.WithTimeout(ctx, 2*time.Second)
+ defer filerCancel()
+
+ glog.V(4).Infof("📋 ListTopics scanning filer for persisted topics (2s timeout)")
err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// List all namespaces under /topics
- stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
+ glog.V(4).Infof("📋 ListTopics calling ListEntries for %s", filer.TopicsDir)
+ stream, err := client.ListEntries(filerCtx, &filer_pb.ListEntriesRequest{
Directory: filer.TopicsDir,
Limit: 1000,
})
@@ -66,6 +90,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err)
return err
}
+ glog.V(4).Infof("📋 ListTopics got ListEntries stream, processing namespaces...")
// Process each namespace
for {
@@ -85,7 +110,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName)
// List all topics in this namespace
- topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
+ topicStream, err := client.ListEntries(filerCtx, &filer_pb.ListEntriesRequest{
Directory: namespacePath,
Limit: 1000,
})
@@ -113,7 +138,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
// Check if topic.conf exists
topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName)
- confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
+ confResp, err := client.LookupDirectoryEntry(filerCtx, &filer_pb.LookupDirectoryEntryRequest{
Directory: topicPath,
Name: filer.TopicConfFile,
})
@@ -123,12 +148,14 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
}
if confResp.Entry != nil {
- // This is a valid topic
- topic := &schema_pb.Topic{
- Namespace: namespaceName,
- Name: topicName,
+ // This is a valid persisted topic - add to map if not already present
+ topicKey := fmt.Sprintf("%s.%s", namespaceName, topicName)
+ if _, exists := topicMap[topicKey]; !exists {
+ topicMap[topicKey] = &schema_pb.Topic{
+ Namespace: namespaceName,
+ Name: topicName,
+ }
}
- ret.Topics = append(ret.Topics, topic)
}
}
}
@@ -136,15 +163,107 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
return nil
})
+ // Convert map to slice for response (combines in-memory and persisted topics)
+ for _, topic := range topicMap {
+ ret.Topics = append(ret.Topics, topic)
+ }
+
if err != nil {
- glog.V(0).Infof("list topics from filer: %v", err)
- // Return empty response on error
- return &mq_pb.ListTopicsResponse{}, nil
+ glog.V(0).Infof("📋 ListTopics: filer scan failed: %v (returning %d in-memory topics)", err, len(inMemoryTopics))
+ // Still return in-memory topics even if filer fails
+ } else {
+ glog.V(4).Infof("📋 ListTopics completed successfully: %d total topics (in-memory + persisted)", len(ret.Topics))
}
return ret, nil
}
+// TopicExists checks if a topic exists in memory or filer
+// Caches both positive and negative results to reduce filer load
+func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.TopicExistsRequest) (*mq_pb.TopicExistsResponse, error) {
+ if !b.isLockOwner() {
+ var resp *mq_pb.TopicExistsResponse
+ var err error
+ proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
+ resp, err = client.TopicExists(ctx, request)
+ return nil
+ })
+ if proxyErr != nil {
+ return nil, proxyErr
+ }
+ return resp, err
+ }
+
+ if request.Topic == nil {
+ return &mq_pb.TopicExistsResponse{Exists: false}, nil
+ }
+
+ // Convert schema_pb.Topic to topic.Topic
+ topicObj := topic.Topic{
+ Namespace: request.Topic.Namespace,
+ Name: request.Topic.Name,
+ }
+ topicKey := topicObj.String()
+
+ // First check in-memory state (includes unflushed topics)
+ if b.localTopicManager.TopicExistsInMemory(topicObj) {
+ return &mq_pb.TopicExistsResponse{Exists: true}, nil
+ }
+
+ // Check cache for filer lookup results (both positive and negative)
+ b.topicExistsCacheMu.RLock()
+ if entry, found := b.topicExistsCache[topicKey]; found {
+ if time.Now().Before(entry.expiresAt) {
+ b.topicExistsCacheMu.RUnlock()
+ glog.V(4).Infof("TopicExists cache HIT for %s: %v", topicKey, entry.exists)
+ return &mq_pb.TopicExistsResponse{Exists: entry.exists}, nil
+ }
+ }
+ b.topicExistsCacheMu.RUnlock()
+
+ // Cache miss or expired - query filer for persisted topics
+ glog.V(4).Infof("TopicExists cache MISS for %s, querying filer", topicKey)
+ exists := false
+ err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ topicPath := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name)
+ confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: topicPath,
+ Name: filer.TopicConfFile,
+ })
+ if err == nil && confResp.Entry != nil {
+ exists = true
+ }
+ return nil // Don't propagate error, just check existence
+ })
+
+ if err != nil {
+ glog.V(0).Infof("check topic existence in filer: %v", err)
+ // Don't cache errors - return false and let next check retry
+ return &mq_pb.TopicExistsResponse{Exists: false}, nil
+ }
+
+ // Update cache with result (both positive and negative)
+ b.topicExistsCacheMu.Lock()
+ b.topicExistsCache[topicKey] = &topicExistsCacheEntry{
+ exists: exists,
+ expiresAt: time.Now().Add(b.topicExistsCacheTTL),
+ }
+ b.topicExistsCacheMu.Unlock()
+ glog.V(4).Infof("TopicExists cached result for %s: %v", topicKey, exists)
+
+ return &mq_pb.TopicExistsResponse{Exists: exists}, nil
+}
+
+// invalidateTopicExistsCache removes a topic from the cache
+// Should be called when a topic is created or deleted
+func (b *MessageQueueBroker) invalidateTopicExistsCache(t topic.Topic) {
+ topicKey := t.String()
+ b.topicExistsCacheMu.Lock()
+ delete(b.topicExistsCache, topicKey)
+ b.topicExistsCacheMu.Unlock()
+ glog.V(4).Infof("Invalidated TopicExists cache for %s", topicKey)
+}
+
// GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
if !b.isLockOwner() {
@@ -178,7 +297,8 @@ func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request
ret := &mq_pb.GetTopicConfigurationResponse{
Topic: request.Topic,
PartitionCount: int32(len(conf.BrokerPartitionAssignments)),
- RecordType: conf.RecordType,
+ MessageRecordType: conf.MessageRecordType,
+ KeyColumns: conf.KeyColumns,
BrokerPartitionAssignments: conf.BrokerPartitionAssignments,
CreatedAtNs: createdAtNs,
LastUpdatedNs: modifiedAtNs,