diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_lookup.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_lookup.go | 146 |
1 files changed, 133 insertions, 13 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index d2dfcaa41..680fba87b 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -42,7 +43,10 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq } func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) { + glog.V(4).Infof("📋 ListTopics called, isLockOwner=%v", b.isLockOwner()) + if !b.isLockOwner() { + glog.V(4).Infof("📋 ListTopics proxying to lock owner: %s", b.lockAsBalancer.LockOwner()) proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { resp, err = client.ListTopics(ctx, request) return nil @@ -53,12 +57,32 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List return resp, err } + glog.V(4).Infof("📋 ListTopics starting - getting in-memory topics") ret := &mq_pb.ListTopicsResponse{} - // Scan the filer directory structure to find all topics + // First, get topics from in-memory state (includes unflushed topics) + inMemoryTopics := b.localTopicManager.ListTopicsInMemory() + glog.V(4).Infof("📋 ListTopics found %d in-memory topics", len(inMemoryTopics)) + topicMap := make(map[string]*schema_pb.Topic) + + // Add in-memory topics to the result + for _, topic := range inMemoryTopics { + topicMap[topic.String()] = &schema_pb.Topic{ + Namespace: topic.Namespace, + Name: topic.Name, + } + } + + // Then, scan the filer directory structure to find persisted topics (fallback for topics not in memory) + // Use a shorter timeout for filer scanning to ensure Metadata requests remain fast + filerCtx, filerCancel := context.WithTimeout(ctx, 2*time.Second) + defer filerCancel() + + glog.V(4).Infof("📋 ListTopics scanning filer for persisted topics (2s timeout)") err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // List all namespaces under /topics - stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + glog.V(4).Infof("📋 ListTopics calling ListEntries for %s", filer.TopicsDir) + stream, err := client.ListEntries(filerCtx, &filer_pb.ListEntriesRequest{ Directory: filer.TopicsDir, Limit: 1000, }) @@ -66,6 +90,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err) return err } + glog.V(4).Infof("📋 ListTopics got ListEntries stream, processing namespaces...") // Process each namespace for { @@ -85,7 +110,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName) // List all topics in this namespace - topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + topicStream, err := client.ListEntries(filerCtx, &filer_pb.ListEntriesRequest{ Directory: namespacePath, Limit: 1000, }) @@ -113,7 +138,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List // Check if topic.conf exists topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName) - confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + confResp, err := client.LookupDirectoryEntry(filerCtx, &filer_pb.LookupDirectoryEntryRequest{ Directory: topicPath, Name: filer.TopicConfFile, }) @@ -123,12 +148,14 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List } if confResp.Entry != nil { - // This is a valid topic - topic := &schema_pb.Topic{ - Namespace: namespaceName, - Name: topicName, + // This is a valid persisted topic - add to map if not already present + topicKey := fmt.Sprintf("%s.%s", namespaceName, topicName) + if _, exists := topicMap[topicKey]; !exists { + topicMap[topicKey] = &schema_pb.Topic{ + Namespace: namespaceName, + Name: topicName, + } } - ret.Topics = append(ret.Topics, topic) } } } @@ -136,15 +163,107 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List return nil }) + // Convert map to slice for response (combines in-memory and persisted topics) + for _, topic := range topicMap { + ret.Topics = append(ret.Topics, topic) + } + if err != nil { - glog.V(0).Infof("list topics from filer: %v", err) - // Return empty response on error - return &mq_pb.ListTopicsResponse{}, nil + glog.V(0).Infof("📋 ListTopics: filer scan failed: %v (returning %d in-memory topics)", err, len(inMemoryTopics)) + // Still return in-memory topics even if filer fails + } else { + glog.V(4).Infof("📋 ListTopics completed successfully: %d total topics (in-memory + persisted)", len(ret.Topics)) } return ret, nil } +// TopicExists checks if a topic exists in memory or filer +// Caches both positive and negative results to reduce filer load +func (b *MessageQueueBroker) TopicExists(ctx context.Context, request *mq_pb.TopicExistsRequest) (*mq_pb.TopicExistsResponse, error) { + if !b.isLockOwner() { + var resp *mq_pb.TopicExistsResponse + var err error + proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error { + resp, err = client.TopicExists(ctx, request) + return nil + }) + if proxyErr != nil { + return nil, proxyErr + } + return resp, err + } + + if request.Topic == nil { + return &mq_pb.TopicExistsResponse{Exists: false}, nil + } + + // Convert schema_pb.Topic to topic.Topic + topicObj := topic.Topic{ + Namespace: request.Topic.Namespace, + Name: request.Topic.Name, + } + topicKey := topicObj.String() + + // First check in-memory state (includes unflushed topics) + if b.localTopicManager.TopicExistsInMemory(topicObj) { + return &mq_pb.TopicExistsResponse{Exists: true}, nil + } + + // Check cache for filer lookup results (both positive and negative) + b.topicExistsCacheMu.RLock() + if entry, found := b.topicExistsCache[topicKey]; found { + if time.Now().Before(entry.expiresAt) { + b.topicExistsCacheMu.RUnlock() + glog.V(4).Infof("TopicExists cache HIT for %s: %v", topicKey, entry.exists) + return &mq_pb.TopicExistsResponse{Exists: entry.exists}, nil + } + } + b.topicExistsCacheMu.RUnlock() + + // Cache miss or expired - query filer for persisted topics + glog.V(4).Infof("TopicExists cache MISS for %s, querying filer", topicKey) + exists := false + err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + topicPath := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, request.Topic.Namespace, request.Topic.Name) + confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: topicPath, + Name: filer.TopicConfFile, + }) + if err == nil && confResp.Entry != nil { + exists = true + } + return nil // Don't propagate error, just check existence + }) + + if err != nil { + glog.V(0).Infof("check topic existence in filer: %v", err) + // Don't cache errors - return false and let next check retry + return &mq_pb.TopicExistsResponse{Exists: false}, nil + } + + // Update cache with result (both positive and negative) + b.topicExistsCacheMu.Lock() + b.topicExistsCache[topicKey] = &topicExistsCacheEntry{ + exists: exists, + expiresAt: time.Now().Add(b.topicExistsCacheTTL), + } + b.topicExistsCacheMu.Unlock() + glog.V(4).Infof("TopicExists cached result for %s: %v", topicKey, exists) + + return &mq_pb.TopicExistsResponse{Exists: exists}, nil +} + +// invalidateTopicExistsCache removes a topic from the cache +// Should be called when a topic is created or deleted +func (b *MessageQueueBroker) invalidateTopicExistsCache(t topic.Topic) { + topicKey := t.String() + b.topicExistsCacheMu.Lock() + delete(b.topicExistsCache, topicKey) + b.topicExistsCacheMu.Unlock() + glog.V(4).Infof("Invalidated TopicExists cache for %s", topicKey) +} + // GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) { if !b.isLockOwner() { @@ -178,7 +297,8 @@ func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request ret := &mq_pb.GetTopicConfigurationResponse{ Topic: request.Topic, PartitionCount: int32(len(conf.BrokerPartitionAssignments)), - RecordType: conf.RecordType, + MessageRecordType: conf.MessageRecordType, + KeyColumns: conf.KeyColumns, BrokerPartitionAssignments: conf.BrokerPartitionAssignments, CreatedAtNs: createdAtNs, LastUpdatedNs: modifiedAtNs, |
