diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_query.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_query.go | 57 |
1 files changed, 25 insertions, 32 deletions
diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go index 21551e65e..228152bdf 100644 --- a/weed/mq/broker/broker_grpc_query.go +++ b/weed/mq/broker/broker_grpc_query.go @@ -17,7 +17,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" ) -// BufferRange represents a range of buffer indexes that have been flushed to disk +// BufferRange represents a range of buffer offsets that have been flushed to disk type BufferRange struct { start int64 end int64 @@ -29,19 +29,22 @@ var ErrNoPartitionAssignment = errors.New("no broker assignment found for partit // GetUnflushedMessages returns messages from the broker's in-memory LogBuffer // that haven't been flushed to disk yet, using buffer_start metadata for deduplication -// Now supports streaming responses and buffer index filtering for better performance +// Now supports streaming responses and buffer offset filtering for better performance // Includes broker routing to redirect requests to the correct broker hosting the topic/partition func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error { // Convert protobuf types to internal types t := topic.FromPbTopic(req.Topic) partition := topic.FromPbPartition(req.Partition) - glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition) - - // Get the local partition for this topic/partition - b.accessLock.Lock() - localPartition := b.localTopicManager.GetLocalPartition(t, partition) - b.accessLock.Unlock() + // Get or generate the local partition for this topic/partition (similar to subscriber flow) + localPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) + if getOrGenErr != nil { + // Fall back to the original logic for broker routing + b.accessLock.Lock() + localPartition = b.localTopicManager.GetLocalPartition(t, partition) + b.accessLock.Unlock() + } else { + } if localPartition == nil { // Topic/partition not found locally, attempt to find the correct broker and redirect @@ -85,45 +88,36 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage flushedBufferRanges = make([]BufferRange, 0) } - // Use buffer_start index for precise deduplication + // Use buffer_start offset for precise deduplication lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs - startBufferIndex := req.StartBufferIndex + startBufferOffset := req.StartBufferOffset startTimeNs := lastFlushTsNs // Still respect last flush time for safety - glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges", - t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges)) - // Stream messages from LogBuffer with filtering messageCount := 0 - startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex) + startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferOffset) - // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication - _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex( + // Use the new LoopProcessLogDataWithOffset method to avoid code duplication + _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithOffset( "GetUnflushedMessages", startPosition, 0, // stopTsNs = 0 means process all available data func() bool { return false }, // waitForDataFn = false means don't wait for new data - func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) { - // Apply buffer index filtering if specified - if startBufferIndex > 0 && batchIndex < startBufferIndex { - glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex) + func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) { + + // Apply buffer offset filtering if specified + if startBufferOffset > 0 && offset < startBufferOffset { return false, nil } // Check if this message is from a buffer range that's already been flushed - if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) { - glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex) + if b.isBufferOffsetFlushed(offset, flushedBufferRanges) { return false, nil } // Stream this message err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{ - Message: &mq_pb.LogEntry{ - TsNs: logEntry.TsNs, - Key: logEntry.Key, - Data: logEntry.Data, - PartitionKeyHash: uint32(logEntry.PartitionKeyHash), - }, + Message: logEntry, EndOfStream: false, }) @@ -159,7 +153,6 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage return err } - glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition) return nil } @@ -263,10 +256,10 @@ func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (* return nil, nil } -// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges -func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool { +// isBufferOffsetFlushed checks if a buffer offset is covered by any of the flushed ranges +func (b *MessageQueueBroker) isBufferOffsetFlushed(bufferOffset int64, flushedRanges []BufferRange) bool { for _, flushedRange := range flushedRanges { - if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end { + if bufferOffset >= flushedRange.start && bufferOffset <= flushedRange.end { return true } } |
