aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_fetch.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_fetch.go')
-rw-r--r--weed/mq/broker/broker_grpc_fetch.go170
1 files changed, 170 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_fetch.go b/weed/mq/broker/broker_grpc_fetch.go
new file mode 100644
index 000000000..19024d852
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_fetch.go
@@ -0,0 +1,170 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+// FetchMessage implements Kafka-style stateless message fetching
+// This is the recommended API for Kafka gateway and other stateless clients
+//
+// Key differences from SubscribeMessage:
+// 1. Request/Response pattern (not streaming)
+// 2. No session state maintained on broker
+// 3. Each request is completely independent
+// 4. Safe for concurrent calls at different offsets
+// 5. No Subscribe loop cancellation/restart complexity
+//
+// Design inspired by Kafka's Fetch API:
+// - Client manages offset tracking
+// - Each fetch is independent
+// - No shared state between requests
+// - Natural support for concurrent reads
+func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchMessageRequest) (*mq_pb.FetchMessageResponse, error) {
+ glog.V(3).Infof("[FetchMessage] CALLED!") // DEBUG: ensure this shows up
+
+ // Validate request
+ if req.Topic == nil {
+ return nil, fmt.Errorf("missing topic")
+ }
+ if req.Partition == nil {
+ return nil, fmt.Errorf("missing partition")
+ }
+
+ t := topic.FromPbTopic(req.Topic)
+ partition := topic.FromPbPartition(req.Partition)
+
+ glog.V(3).Infof("[FetchMessage] %s/%s partition=%v offset=%d maxMessages=%d maxBytes=%d consumer=%s/%s",
+ t.Namespace, t.Name, partition, req.StartOffset, req.MaxMessages, req.MaxBytes,
+ req.ConsumerGroup, req.ConsumerId)
+
+ // Get local partition
+ localPartition, err := b.GetOrGenerateLocalPartition(t, partition)
+ if err != nil {
+ glog.Errorf("[FetchMessage] Failed to get partition: %v", err)
+ return &mq_pb.FetchMessageResponse{
+ Error: fmt.Sprintf("partition not found: %v", err),
+ ErrorCode: 1,
+ }, nil
+ }
+ if localPartition == nil {
+ return &mq_pb.FetchMessageResponse{
+ Error: "partition not found",
+ ErrorCode: 1,
+ }, nil
+ }
+
+ // Set defaults for limits
+ maxMessages := int(req.MaxMessages)
+ if maxMessages <= 0 {
+ maxMessages = 100 // Reasonable default
+ }
+ if maxMessages > 10000 {
+ maxMessages = 10000 // Safety limit
+ }
+
+ maxBytes := int(req.MaxBytes)
+ if maxBytes <= 0 {
+ maxBytes = 4 * 1024 * 1024 // 4MB default
+ }
+ if maxBytes > 100*1024*1024 {
+ maxBytes = 100 * 1024 * 1024 // 100MB safety limit
+ }
+
+ // TODO: Long poll support disabled for now (causing timeouts)
+ // Check if we should wait for data (long poll support)
+ // shouldWait := req.MaxWaitMs > 0
+ // if shouldWait {
+ // // Wait for data to be available (with timeout)
+ // dataAvailable := localPartition.LogBuffer.WaitForDataWithTimeout(req.StartOffset, int(req.MaxWaitMs))
+ // if !dataAvailable {
+ // // Timeout - return empty response
+ // glog.V(3).Infof("[FetchMessage] Timeout waiting for data at offset %d", req.StartOffset)
+ // return &mq_pb.FetchMessageResponse{
+ // Messages: []*mq_pb.DataMessage{},
+ // HighWaterMark: localPartition.LogBuffer.GetHighWaterMark(),
+ // LogStartOffset: localPartition.LogBuffer.GetLogStartOffset(),
+ // EndOfPartition: false,
+ // NextOffset: req.StartOffset,
+ // }, nil
+ // }
+ // }
+
+ // Check if disk read function is configured
+ if localPartition.LogBuffer.ReadFromDiskFn == nil {
+ glog.Errorf("[FetchMessage] LogBuffer.ReadFromDiskFn is nil! This should not happen.")
+ } else {
+ glog.V(3).Infof("[FetchMessage] LogBuffer.ReadFromDiskFn is configured")
+ }
+
+ // Use requested offset directly - let ReadMessagesAtOffset handle disk reads
+ requestedOffset := req.StartOffset
+
+ // Read messages from LogBuffer (stateless read)
+ glog.Infof("[FetchMessage] About to read from LogBuffer: topic=%s partition=%v offset=%d maxMessages=%d maxBytes=%d",
+ t.Name, partition, requestedOffset, maxMessages, maxBytes)
+
+ logEntries, nextOffset, highWaterMark, endOfPartition, err := localPartition.LogBuffer.ReadMessagesAtOffset(
+ requestedOffset,
+ maxMessages,
+ maxBytes,
+ )
+
+ // CRITICAL: Log the result with full details
+ if len(logEntries) == 0 && highWaterMark > requestedOffset && err == nil {
+ glog.Errorf("[FetchMessage] CRITICAL: ReadMessagesAtOffset returned 0 entries but HWM=%d > requestedOffset=%d (should return data!)",
+ highWaterMark, requestedOffset)
+ glog.Errorf("[FetchMessage] Details: nextOffset=%d, endOfPartition=%v, bufferStartOffset=%d",
+ nextOffset, endOfPartition, localPartition.LogBuffer.GetLogStartOffset())
+ }
+
+ glog.Infof("[FetchMessage] Read completed: topic=%s partition=%v offset=%d -> %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v",
+ t.Name, partition, requestedOffset, len(logEntries), nextOffset, highWaterMark, endOfPartition, err)
+
+ if err != nil {
+ // Check if this is an "offset out of range" error
+ errMsg := err.Error()
+ if len(errMsg) > 0 && (len(errMsg) < 20 || errMsg[:20] != "offset") {
+ glog.Errorf("[FetchMessage] Read error: %v", err)
+ } else {
+ // Offset out of range - this is expected when consumer requests old data
+ glog.V(3).Infof("[FetchMessage] Offset out of range: %v", err)
+ }
+
+ // Return empty response with metadata - let client adjust offset
+ return &mq_pb.FetchMessageResponse{
+ Messages: []*mq_pb.DataMessage{},
+ HighWaterMark: highWaterMark,
+ LogStartOffset: localPartition.LogBuffer.GetLogStartOffset(),
+ EndOfPartition: false,
+ NextOffset: localPartition.LogBuffer.GetLogStartOffset(), // Suggest starting from earliest available
+ Error: errMsg,
+ ErrorCode: 2,
+ }, nil
+ }
+
+ // Convert to protobuf messages
+ messages := make([]*mq_pb.DataMessage, 0, len(logEntries))
+ for _, entry := range logEntries {
+ messages = append(messages, &mq_pb.DataMessage{
+ Key: entry.Key,
+ Value: entry.Data,
+ TsNs: entry.TsNs,
+ })
+ }
+
+ glog.V(4).Infof("[FetchMessage] Returning %d messages, nextOffset=%d, highWaterMark=%d, endOfPartition=%v",
+ len(messages), nextOffset, highWaterMark, endOfPartition)
+
+ return &mq_pb.FetchMessageResponse{
+ Messages: messages,
+ HighWaterMark: highWaterMark,
+ LogStartOffset: localPartition.LogBuffer.GetLogStartOffset(),
+ EndOfPartition: endOfPartition,
+ NextOffset: nextOffset,
+ }, nil
+}