diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_query.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_query.go | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go new file mode 100644 index 000000000..21551e65e --- /dev/null +++ b/weed/mq/broker/broker_grpc_query.go @@ -0,0 +1,358 @@ +package broker + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" +) + +// BufferRange represents a range of buffer indexes that have been flushed to disk +type BufferRange struct { + start int64 + end int64 +} + +// ErrNoPartitionAssignment indicates no broker assignment found for the partition. +// This is a normal case that means there are no unflushed messages for this partition. +var ErrNoPartitionAssignment = errors.New("no broker assignment found for partition") + +// GetUnflushedMessages returns messages from the broker's in-memory LogBuffer +// that haven't been flushed to disk yet, using buffer_start metadata for deduplication +// Now supports streaming responses and buffer index filtering for better performance +// Includes broker routing to redirect requests to the correct broker hosting the topic/partition +func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { + // Convert protobuf types to internal types + t := topic.FromPbTopic(req.Topic) + partition := topic.FromPbPartition(req.Partition) + + glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition) + + // Get the local partition for this topic/partition + b.accessLock.Lock() + localPartition := b.localTopicManager.GetLocalPartition(t, partition) + b.accessLock.Unlock() + + if localPartition == nil { + // Topic/partition not found locally, attempt to find the correct broker and redirect + glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition) + + // Look up which broker hosts this topic/partition + brokerHost, err := b.findBrokerForTopicPartition(req.Topic, req.Partition) + if err != nil { + if errors.Is(err, ErrNoPartitionAssignment) { + // Normal case: no broker assignment means no unflushed messages + glog.V(2).Infof("No broker assignment for %v %v - no unflushed messages", t, partition) + return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + EndOfStream: true, + }) + } + return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + Error: fmt.Sprintf("failed to find broker for %v %v: %v", t, partition, err), + EndOfStream: true, + }) + } + + if brokerHost == "" { + // This should not happen after ErrNoPartitionAssignment check, but keep for safety + glog.V(2).Infof("Empty broker host for %v %v - no unflushed messages", t, partition) + return stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + EndOfStream: true, + }) + } + + // Redirect to the correct broker + glog.V(1).Infof("Redirecting GetUnflushedMessages request for %v %v to broker %s", t, partition, brokerHost) + return b.redirectGetUnflushedMessages(brokerHost, req, stream) + } + + // Build deduplication map from existing log files using buffer_start metadata + partitionDir := topic.PartitionDir(t, partition) + flushedBufferRanges, err := b.buildBufferStartDeduplicationMap(partitionDir) + if err != nil { + glog.Errorf("Failed to build deduplication map for %v %v: %v", t, partition, err) + // Continue with empty map - better to potentially duplicate than to miss data + flushedBufferRanges = make([]BufferRange, 0) + } + + // Use buffer_start index for precise deduplication + lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs + startBufferIndex := req.StartBufferIndex + startTimeNs := lastFlushTsNs // Still respect last flush time for safety + + glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges", + t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges)) + + // Stream messages from LogBuffer with filtering + messageCount := 0 + startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) + + // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication + _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex( + "GetUnflushedMessages", + startPosition, + 0, // stopTsNs = 0 means process all available data + func() bool { return false }, // waitForDataFn = false means don't wait for new data + func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) { + // Apply buffer index filtering if specified + if startBufferIndex > 0 && batchIndex < startBufferIndex { + glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex) + return false, nil + } + + // Check if this message is from a buffer range that's already been flushed + if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) { + glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex) + return false, nil + } + + // Stream this message + err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + Message: &mq_pb.LogEntry{ + TsNs: logEntry.TsNs, + Key: logEntry.Key, + Data: logEntry.Data, + PartitionKeyHash: uint32(logEntry.PartitionKeyHash), + }, + EndOfStream: false, + }) + + if err != nil { + glog.Errorf("Failed to stream message: %v", err) + return true, err // isDone = true to stop processing + } + + messageCount++ + return false, nil // Continue processing + }, + ) + + // Handle collection errors + if err != nil && err != log_buffer.ResumeFromDiskError { + streamErr := stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + Error: fmt.Sprintf("failed to stream unflushed messages: %v", err), + EndOfStream: true, + }) + if streamErr != nil { + glog.Errorf("Failed to send error response: %v", streamErr) + } + return err + } + + // Send end-of-stream marker + err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{ + EndOfStream: true, + }) + + if err != nil { + glog.Errorf("Failed to send end-of-stream marker: %v", err) + return err + } + + glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition) + return nil +} + +// buildBufferStartDeduplicationMap scans log files to build a map of buffer ranges +// that have been flushed to disk, using the buffer_start metadata +func (b *MessageQueueBroker) buildBufferStartDeduplicationMap(partitionDir string) ([]BufferRange, error) { + var flushedRanges []BufferRange + + // List all files in the partition directory using filer client accessor + // Use pagination to handle directories with more than 1000 files + err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var lastFileName string + var hasMore = true + + for hasMore { + var currentBatchProcessed int + err := filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + currentBatchProcessed++ + hasMore = !isLast // If this is the last entry of a full batch, there might be more + lastFileName = entry.Name + + if entry.IsDirectory { + return nil + } + + // Skip Parquet files - they don't represent buffer ranges + if strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Skip offset files + if strings.HasSuffix(entry.Name, ".offset") { + return nil + } + + // Get buffer start for this file + bufferStart, err := b.getLogBufferStartFromFile(entry) + if err != nil { + glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err) + return nil // Continue with other files + } + + if bufferStart == nil { + // File has no buffer metadata - skip deduplication for this file + glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name) + return nil + } + + // Calculate the buffer range covered by this file + chunkCount := int64(len(entry.GetChunks())) + if chunkCount > 0 { + fileRange := BufferRange{ + start: bufferStart.StartIndex, + end: bufferStart.StartIndex + chunkCount - 1, + } + flushedRanges = append(flushedRanges, fileRange) + glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end) + } + + return nil + }, lastFileName, false, 1000) // Start from last processed file name for next batch + + if err != nil { + return err + } + + // If we processed fewer than 1000 entries, we've reached the end + if currentBatchProcessed < 1000 { + hasMore = false + } + } + + return nil + }) + + if err != nil { + return flushedRanges, fmt.Errorf("failed to list partition directory %s: %v", partitionDir, err) + } + + return flushedRanges, nil +} + +// getLogBufferStartFromFile extracts LogBufferStart metadata from a log file +func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) { + if entry.Extended == nil { + return nil, nil + } + + // Only support binary buffer_start format + if startData, exists := entry.Extended["buffer_start"]; exists { + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return &LogBufferStart{StartIndex: startIndex}, nil + } + } else { + return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData)) + } + } + + return nil, nil +} + +// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges +func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool { + for _, flushedRange := range flushedRanges { + if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end { + return true + } + } + return false +} + +// findBrokerForTopicPartition finds which broker hosts the specified topic/partition +func (b *MessageQueueBroker) findBrokerForTopicPartition(topic *schema_pb.Topic, partition *schema_pb.Partition) (string, error) { + // Use LookupTopicBrokers to find which broker hosts this topic/partition + ctx := context.Background() + lookupReq := &mq_pb.LookupTopicBrokersRequest{ + Topic: topic, + } + + // If we're not the lock owner (balancer), we need to redirect to the balancer first + var lookupResp *mq_pb.LookupTopicBrokersResponse + var err error + + if !b.isLockOwner() { + // Redirect to balancer to get topic broker assignments + balancerAddress := pb.ServerAddress(b.lockAsBalancer.LockOwner()) + err = b.withBrokerClient(false, balancerAddress, func(client mq_pb.SeaweedMessagingClient) error { + lookupResp, err = client.LookupTopicBrokers(ctx, lookupReq) + return err + }) + } else { + // We are the balancer, handle the lookup directly + lookupResp, err = b.LookupTopicBrokers(ctx, lookupReq) + } + + if err != nil { + return "", fmt.Errorf("failed to lookup topic brokers: %v", err) + } + + // Find the broker assignment that matches our partition + for _, assignment := range lookupResp.BrokerPartitionAssignments { + if b.partitionsMatch(partition, assignment.Partition) { + if assignment.LeaderBroker != "" { + return assignment.LeaderBroker, nil + } + } + } + + return "", ErrNoPartitionAssignment +} + +// partitionsMatch checks if two partitions represent the same partition +func (b *MessageQueueBroker) partitionsMatch(p1, p2 *schema_pb.Partition) bool { + return p1.RingSize == p2.RingSize && + p1.RangeStart == p2.RangeStart && + p1.RangeStop == p2.RangeStop && + p1.UnixTimeNs == p2.UnixTimeNs +} + +// redirectGetUnflushedMessages forwards the GetUnflushedMessages request to the correct broker +func (b *MessageQueueBroker) redirectGetUnflushedMessages(brokerHost string, req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { + ctx := stream.Context() + + // Connect to the target broker and forward the request + return b.withBrokerClient(false, pb.ServerAddress(brokerHost), func(client mq_pb.SeaweedMessagingClient) error { + // Create a new stream to the target broker + targetStream, err := client.GetUnflushedMessages(ctx, req) + if err != nil { + return fmt.Errorf("failed to create stream to broker %s: %v", brokerHost, err) + } + + // Forward all responses from the target broker to our client + for { + response, err := targetStream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + // Normal end of stream + return nil + } + return fmt.Errorf("error receiving from broker %s: %v", brokerHost, err) + } + + // Forward the response to our client + if sendErr := stream.Send(response); sendErr != nil { + return fmt.Errorf("error forwarding response to client: %v", sendErr) + } + + // Check if this is the end of stream + if response.EndOfStream { + return nil + } + } + }) +} |
