aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_query.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_query.go')
-rw-r--r--weed/mq/broker/broker_grpc_query.go358
1 files changed, 358 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go
new file mode 100644
index 000000000..21551e65e
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_query.go
@@ -0,0 +1,358 @@
+package broker
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+)
+
+// BufferRange represents a range of buffer indexes that have been flushed to disk
+type BufferRange struct {
+ start int64
+ end int64
+}
+
+// ErrNoPartitionAssignment indicates no broker assignment found for the partition.
+// This is a normal case that means there are no unflushed messages for this partition.
+var ErrNoPartitionAssignment = errors.New("no broker assignment found for partition")
+
+// GetUnflushedMessages returns messages from the broker's in-memory LogBuffer
+// that haven't been flushed to disk yet, using buffer_start metadata for deduplication
+// Now supports streaming responses and buffer index filtering for better performance
+// Includes broker routing to redirect requests to the correct broker hosting the topic/partition
+func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
+ // Convert protobuf types to internal types
+ t := topic.FromPbTopic(req.Topic)
+ partition := topic.FromPbPartition(req.Partition)
+
+ glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition)
+
+ // Get the local partition for this topic/partition
+ b.accessLock.Lock()
+ localPartition := b.localTopicManager.GetLocalPartition(t, partition)
+ b.accessLock.Unlock()
+
+ if localPartition == nil {
+ // Topic/partition not found locally, attempt to find the correct broker and redirect
+ glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition)
+
+ // Look up which broker hosts this topic/partition
+ brokerHost, err := b.findBrokerForTopicPartition(req.Topic, req.Partition)
+ if err != nil {
+ if errors.Is(err, ErrNoPartitionAssignment) {
+ // Normal case: no broker assignment means no unflushed messages
+ glog.V(2).Infof("No broker assignment for %v %v - no unflushed messages", t, partition)
+ return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
+ EndOfStream: true,
+ })
+ }
+ return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
+ Error: fmt.Sprintf("failed to find broker for %v %v: %v", t, partition, err),
+ EndOfStream: true,
+ })
+ }
+
+ if brokerHost == "" {
+ // This should not happen after ErrNoPartitionAssignment check, but keep for safety
+ glog.V(2).Infof("Empty broker host for %v %v - no unflushed messages", t, partition)
+ return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
+ EndOfStream: true,
+ })
+ }
+
+ // Redirect to the correct broker
+ glog.V(1).Infof("Redirecting GetUnflushedMessages request for %v %v to broker %s", t, partition, brokerHost)
+ return b.redirectGetUnflushedMessages(brokerHost, req, stream)
+ }
+
+ // Build deduplication map from existing log files using buffer_start metadata
+ partitionDir := topic.PartitionDir(t, partition)
+ flushedBufferRanges, err := b.buildBufferStartDeduplicationMap(partitionDir)
+ if err != nil {
+ glog.Errorf("Failed to build deduplication map for %v %v: %v", t, partition, err)
+ // Continue with empty map - better to potentially duplicate than to miss data
+ flushedBufferRanges = make([]BufferRange, 0)
+ }
+
+ // Use buffer_start index for precise deduplication
+ lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs
+ startBufferIndex := req.StartBufferIndex
+ startTimeNs := lastFlushTsNs // Still respect last flush time for safety
+
+ glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges",
+ t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges))
+
+ // Stream messages from LogBuffer with filtering
+ messageCount := 0
+ startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex)
+
+ // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication
+ _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex(
+ "GetUnflushedMessages",
+ startPosition,
+ 0, // stopTsNs = 0 means process all available data
+ func() bool { return false }, // waitForDataFn = false means don't wait for new data
+ func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) {
+ // Apply buffer index filtering if specified
+ if startBufferIndex > 0 && batchIndex < startBufferIndex {
+ glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex)
+ return false, nil
+ }
+
+ // Check if this message is from a buffer range that's already been flushed
+ if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) {
+ glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex)
+ return false, nil
+ }
+
+ // Stream this message
+ err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
+ Message: &mq_pb.LogEntry{
+ TsNs: logEntry.TsNs,
+ Key: logEntry.Key,
+ Data: logEntry.Data,
+ PartitionKeyHash: uint32(logEntry.PartitionKeyHash),
+ },
+ EndOfStream: false,
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to stream message: %v", err)
+ return true, err // isDone = true to stop processing
+ }
+
+ messageCount++
+ return false, nil // Continue processing
+ },
+ )
+
+ // Handle collection errors
+ if err != nil && err != log_buffer.ResumeFromDiskError {
+ streamErr := stream.Send(&mq_pb.GetUnflushedMessagesResponse{
+ Error: fmt.Sprintf("failed to stream unflushed messages: %v", err),
+ EndOfStream: true,
+ })
+ if streamErr != nil {
+ glog.Errorf("Failed to send error response: %v", streamErr)
+ }
+ return err
+ }
+
+ // Send end-of-stream marker
+ err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
+ EndOfStream: true,
+ })
+
+ if err != nil {
+ glog.Errorf("Failed to send end-of-stream marker: %v", err)
+ return err
+ }
+
+ glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition)
+ return nil
+}
+
+// buildBufferStartDeduplicationMap scans log files to build a map of buffer ranges
+// that have been flushed to disk, using the buffer_start metadata
+func (b *MessageQueueBroker) buildBufferStartDeduplicationMap(partitionDir string) ([]BufferRange, error) {
+ var flushedRanges []BufferRange
+
+ // List all files in the partition directory using filer client accessor
+ // Use pagination to handle directories with more than 1000 files
+ err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ var lastFileName string
+ var hasMore = true
+
+ for hasMore {
+ var currentBatchProcessed int
+ err := filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ currentBatchProcessed++
+ hasMore = !isLast // If this is the last entry of a full batch, there might be more
+ lastFileName = entry.Name
+
+ if entry.IsDirectory {
+ return nil
+ }
+
+ // Skip Parquet files - they don't represent buffer ranges
+ if strings.HasSuffix(entry.Name, ".parquet") {
+ return nil
+ }
+
+ // Skip offset files
+ if strings.HasSuffix(entry.Name, ".offset") {
+ return nil
+ }
+
+ // Get buffer start for this file
+ bufferStart, err := b.getLogBufferStartFromFile(entry)
+ if err != nil {
+ glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err)
+ return nil // Continue with other files
+ }
+
+ if bufferStart == nil {
+ // File has no buffer metadata - skip deduplication for this file
+ glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name)
+ return nil
+ }
+
+ // Calculate the buffer range covered by this file
+ chunkCount := int64(len(entry.GetChunks()))
+ if chunkCount > 0 {
+ fileRange := BufferRange{
+ start: bufferStart.StartIndex,
+ end: bufferStart.StartIndex + chunkCount - 1,
+ }
+ flushedRanges = append(flushedRanges, fileRange)
+ glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end)
+ }
+
+ return nil
+ }, lastFileName, false, 1000) // Start from last processed file name for next batch
+
+ if err != nil {
+ return err
+ }
+
+ // If we processed fewer than 1000 entries, we've reached the end
+ if currentBatchProcessed < 1000 {
+ hasMore = false
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return flushedRanges, fmt.Errorf("failed to list partition directory %s: %v", partitionDir, err)
+ }
+
+ return flushedRanges, nil
+}
+
+// getLogBufferStartFromFile extracts LogBufferStart metadata from a log file
+func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) {
+ if entry.Extended == nil {
+ return nil, nil
+ }
+
+ // Only support binary buffer_start format
+ if startData, exists := entry.Extended["buffer_start"]; exists {
+ if len(startData) == 8 {
+ startIndex := int64(binary.BigEndian.Uint64(startData))
+ if startIndex > 0 {
+ return &LogBufferStart{StartIndex: startIndex}, nil
+ }
+ } else {
+ return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData))
+ }
+ }
+
+ return nil, nil
+}
+
+// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges
+func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool {
+ for _, flushedRange := range flushedRanges {
+ if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end {
+ return true
+ }
+ }
+ return false
+}
+
+// findBrokerForTopicPartition finds which broker hosts the specified topic/partition
+func (b *MessageQueueBroker) findBrokerForTopicPartition(topic *schema_pb.Topic, partition *schema_pb.Partition) (string, error) {
+ // Use LookupTopicBrokers to find which broker hosts this topic/partition
+ ctx := context.Background()
+ lookupReq := &mq_pb.LookupTopicBrokersRequest{
+ Topic: topic,
+ }
+
+ // If we're not the lock owner (balancer), we need to redirect to the balancer first
+ var lookupResp *mq_pb.LookupTopicBrokersResponse
+ var err error
+
+ if !b.isLockOwner() {
+ // Redirect to balancer to get topic broker assignments
+ balancerAddress := pb.ServerAddress(b.lockAsBalancer.LockOwner())
+ err = b.withBrokerClient(false, balancerAddress, func(client mq_pb.SeaweedMessagingClient) error {
+ lookupResp, err = client.LookupTopicBrokers(ctx, lookupReq)
+ return err
+ })
+ } else {
+ // We are the balancer, handle the lookup directly
+ lookupResp, err = b.LookupTopicBrokers(ctx, lookupReq)
+ }
+
+ if err != nil {
+ return "", fmt.Errorf("failed to lookup topic brokers: %v", err)
+ }
+
+ // Find the broker assignment that matches our partition
+ for _, assignment := range lookupResp.BrokerPartitionAssignments {
+ if b.partitionsMatch(partition, assignment.Partition) {
+ if assignment.LeaderBroker != "" {
+ return assignment.LeaderBroker, nil
+ }
+ }
+ }
+
+ return "", ErrNoPartitionAssignment
+}
+
+// partitionsMatch checks if two partitions represent the same partition
+func (b *MessageQueueBroker) partitionsMatch(p1, p2 *schema_pb.Partition) bool {
+ return p1.RingSize == p2.RingSize &&
+ p1.RangeStart == p2.RangeStart &&
+ p1.RangeStop == p2.RangeStop &&
+ p1.UnixTimeNs == p2.UnixTimeNs
+}
+
+// redirectGetUnflushedMessages forwards the GetUnflushedMessages request to the correct broker
+func (b *MessageQueueBroker) redirectGetUnflushedMessages(brokerHost string, req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
+ ctx := stream.Context()
+
+ // Connect to the target broker and forward the request
+ return b.withBrokerClient(false, pb.ServerAddress(brokerHost), func(client mq_pb.SeaweedMessagingClient) error {
+ // Create a new stream to the target broker
+ targetStream, err := client.GetUnflushedMessages(ctx, req)
+ if err != nil {
+ return fmt.Errorf("failed to create stream to broker %s: %v", brokerHost, err)
+ }
+
+ // Forward all responses from the target broker to our client
+ for {
+ response, err := targetStream.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ // Normal end of stream
+ return nil
+ }
+ return fmt.Errorf("error receiving from broker %s: %v", brokerHost, err)
+ }
+
+ // Forward the response to our client
+ if sendErr := stream.Send(response); sendErr != nil {
+ return fmt.Errorf("error forwarding response to client: %v", sendErr)
+ }
+
+ // Check if this is the end of stream
+ if response.EndOfStream {
+ return nil
+ }
+ }
+ })
+}