aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_query.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_query.go')
-rw-r--r--weed/mq/broker/broker_grpc_query.go57
1 files changed, 25 insertions, 32 deletions
diff --git a/weed/mq/broker/broker_grpc_query.go b/weed/mq/broker/broker_grpc_query.go
index 21551e65e..228152bdf 100644
--- a/weed/mq/broker/broker_grpc_query.go
+++ b/weed/mq/broker/broker_grpc_query.go
@@ -17,7 +17,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
-// BufferRange represents a range of buffer indexes that have been flushed to disk
+// BufferRange represents a range of buffer offsets that have been flushed to disk
type BufferRange struct {
start int64
end int64
@@ -29,19 +29,22 @@ var ErrNoPartitionAssignment = errors.New("no broker assignment found for partit
// GetUnflushedMessages returns messages from the broker's in-memory LogBuffer
// that haven't been flushed to disk yet, using buffer_start metadata for deduplication
-// Now supports streaming responses and buffer index filtering for better performance
+// Now supports streaming responses and buffer offset filtering for better performance
// Includes broker routing to redirect requests to the correct broker hosting the topic/partition
func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
// Convert protobuf types to internal types
t := topic.FromPbTopic(req.Topic)
partition := topic.FromPbPartition(req.Partition)
- glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition)
-
- // Get the local partition for this topic/partition
- b.accessLock.Lock()
- localPartition := b.localTopicManager.GetLocalPartition(t, partition)
- b.accessLock.Unlock()
+ // Get or generate the local partition for this topic/partition (similar to subscriber flow)
+ localPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
+ if getOrGenErr != nil {
+ // Fall back to the original logic for broker routing
+ b.accessLock.Lock()
+ localPartition = b.localTopicManager.GetLocalPartition(t, partition)
+ b.accessLock.Unlock()
+ } else {
+ }
if localPartition == nil {
// Topic/partition not found locally, attempt to find the correct broker and redirect
@@ -85,45 +88,36 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
flushedBufferRanges = make([]BufferRange, 0)
}
- // Use buffer_start index for precise deduplication
+ // Use buffer_start offset for precise deduplication
lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs
- startBufferIndex := req.StartBufferIndex
+ startBufferOffset := req.StartBufferOffset
startTimeNs := lastFlushTsNs // Still respect last flush time for safety
- glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges",
- t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges))
-
// Stream messages from LogBuffer with filtering
messageCount := 0
- startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex)
+ startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferOffset)
- // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication
- _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex(
+ // Use the new LoopProcessLogDataWithOffset method to avoid code duplication
+ _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithOffset(
"GetUnflushedMessages",
startPosition,
0, // stopTsNs = 0 means process all available data
func() bool { return false }, // waitForDataFn = false means don't wait for new data
- func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) {
- // Apply buffer index filtering if specified
- if startBufferIndex > 0 && batchIndex < startBufferIndex {
- glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex)
+ func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) {
+
+ // Apply buffer offset filtering if specified
+ if startBufferOffset > 0 && offset < startBufferOffset {
return false, nil
}
// Check if this message is from a buffer range that's already been flushed
- if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) {
- glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex)
+ if b.isBufferOffsetFlushed(offset, flushedBufferRanges) {
return false, nil
}
// Stream this message
err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- Message: &mq_pb.LogEntry{
- TsNs: logEntry.TsNs,
- Key: logEntry.Key,
- Data: logEntry.Data,
- PartitionKeyHash: uint32(logEntry.PartitionKeyHash),
- },
+ Message: logEntry,
EndOfStream: false,
})
@@ -159,7 +153,6 @@ func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessage
return err
}
- glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition)
return nil
}
@@ -263,10 +256,10 @@ func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*
return nil, nil
}
-// isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges
-func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool {
+// isBufferOffsetFlushed checks if a buffer offset is covered by any of the flushed ranges
+func (b *MessageQueueBroker) isBufferOffsetFlushed(bufferOffset int64, flushedRanges []BufferRange) bool {
for _, flushedRange := range flushedRanges {
- if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end {
+ if bufferOffset >= flushedRange.start && bufferOffset <= flushedRange.end {
return true
}
}