aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_log_buffer_offset.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_log_buffer_offset.go')
-rw-r--r--weed/mq/broker/broker_log_buffer_offset.go169
1 files changed, 169 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_log_buffer_offset.go b/weed/mq/broker/broker_log_buffer_offset.go
new file mode 100644
index 000000000..aeb8fad1b
--- /dev/null
+++ b/weed/mq/broker/broker_log_buffer_offset.go
@@ -0,0 +1,169 @@
+package broker
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "google.golang.org/protobuf/proto"
+)
+
+// OffsetAssignmentFunc is a function type for assigning offsets to messages
+type OffsetAssignmentFunc func() (int64, error)
+
+// AddToBufferWithOffset adds a message to the log buffer with offset assignment
+// TODO: This is a temporary solution until LogBuffer can be modified to accept offset assignment
+// ASSUMPTION: This function will be integrated into LogBuffer.AddToBuffer in the future
+func (b *MessageQueueBroker) AddToBufferWithOffset(
+ logBuffer *log_buffer.LogBuffer,
+ message *mq_pb.DataMessage,
+ t topic.Topic,
+ p topic.Partition,
+) error {
+ // Assign offset for this message
+ offset, err := b.offsetManager.AssignOffset(t, p)
+ if err != nil {
+ return err
+ }
+
+ // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
+ var ts time.Time
+ processingTsNs := message.TsNs
+ if processingTsNs == 0 {
+ ts = time.Now()
+ processingTsNs = ts.UnixNano()
+ } else {
+ ts = time.Unix(0, processingTsNs)
+ }
+
+ // Create LogEntry with assigned offset
+ logEntry := &filer_pb.LogEntry{
+ TsNs: processingTsNs,
+ PartitionKeyHash: util.HashToInt32(message.Key),
+ Data: message.Value,
+ Key: message.Key,
+ Offset: offset, // Add the assigned offset
+ }
+
+ logEntryData, err := proto.Marshal(logEntry)
+ if err != nil {
+ return err
+ }
+
+ // Use the existing LogBuffer infrastructure for the rest
+ // TODO: This is a workaround - ideally LogBuffer should handle offset assignment
+ // For now, we'll add the message with the pre-assigned offset
+ return b.addLogEntryToBuffer(logBuffer, logEntry, logEntryData, ts)
+}
+
+// addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer
+// This is a helper function that mimics LogBuffer.AddDataToBuffer but with a pre-built LogEntry
+func (b *MessageQueueBroker) addLogEntryToBuffer(
+ logBuffer *log_buffer.LogBuffer,
+ logEntry *filer_pb.LogEntry,
+ logEntryData []byte,
+ ts time.Time,
+) error {
+ // TODO: This is a simplified version of LogBuffer.AddDataToBuffer
+ // ASSUMPTION: We're bypassing some of the LogBuffer's internal logic
+ // This should be properly integrated when LogBuffer is modified
+
+ // Use the new AddLogEntryToBuffer method to preserve offset information
+ // This ensures the offset is maintained throughout the entire data flow
+ logBuffer.AddLogEntryToBuffer(logEntry)
+ return nil
+}
+
+// GetPartitionOffsetInfoInternal returns offset information for a partition (internal method)
+func (b *MessageQueueBroker) GetPartitionOffsetInfoInternal(t topic.Topic, p topic.Partition) (*PartitionOffsetInfo, error) {
+ info, err := b.offsetManager.GetPartitionOffsetInfo(t, p)
+ if err != nil {
+ return nil, err
+ }
+
+ // CRITICAL FIX: Also check LogBuffer for in-memory messages
+ // The offset manager only tracks assigned offsets from persistent storage
+ // But the LogBuffer contains recently written messages that haven't been flushed yet
+ localPartition := b.localTopicManager.GetLocalPartition(t, p)
+ logBufferHWM := int64(-1)
+ if localPartition != nil && localPartition.LogBuffer != nil {
+ logBufferHWM = localPartition.LogBuffer.GetOffset()
+ } else {
+ }
+
+ // Use the MAX of offset manager HWM and LogBuffer HWM
+ // This ensures we report the correct HWM even if data hasn't been flushed to disk yet
+ // IMPORTANT: Use >= not > because when they're equal, we still want the correct value
+ highWaterMark := info.HighWaterMark
+ if logBufferHWM >= 0 && logBufferHWM > highWaterMark {
+ highWaterMark = logBufferHWM
+ } else if logBufferHWM >= 0 && logBufferHWM == highWaterMark && highWaterMark > 0 {
+ } else if logBufferHWM >= 0 {
+ }
+
+ // Latest offset is HWM - 1 (last assigned offset)
+ latestOffset := highWaterMark - 1
+ if highWaterMark == 0 {
+ latestOffset = -1 // No records
+ }
+
+ // Convert to broker-specific format
+ return &PartitionOffsetInfo{
+ Topic: t,
+ Partition: p,
+ EarliestOffset: info.EarliestOffset,
+ LatestOffset: latestOffset,
+ HighWaterMark: highWaterMark,
+ RecordCount: highWaterMark, // HWM equals record count (offsets 0 to HWM-1)
+ ActiveSubscriptions: info.ActiveSubscriptions,
+ }, nil
+}
+
+// PartitionOffsetInfo provides offset information for a partition (broker-specific)
+type PartitionOffsetInfo struct {
+ Topic topic.Topic
+ Partition topic.Partition
+ EarliestOffset int64
+ LatestOffset int64
+ HighWaterMark int64
+ RecordCount int64
+ ActiveSubscriptions int64
+}
+
+// CreateOffsetSubscription creates an offset-based subscription through the broker
+func (b *MessageQueueBroker) CreateOffsetSubscription(
+ subscriptionID string,
+ t topic.Topic,
+ p topic.Partition,
+ offsetType string, // Will be converted to schema_pb.OffsetType
+ startOffset int64,
+) error {
+ // TODO: Convert string offsetType to schema_pb.OffsetType
+ // ASSUMPTION: For now using RESET_TO_EARLIEST as default
+ // This should be properly mapped based on the offsetType parameter
+
+ _, err := b.offsetManager.CreateSubscription(
+ subscriptionID,
+ t,
+ p,
+ 0, // schema_pb.OffsetType_RESET_TO_EARLIEST
+ startOffset,
+ )
+
+ return err
+}
+
+// GetOffsetMetrics returns offset metrics for monitoring
+func (b *MessageQueueBroker) GetOffsetMetrics() map[string]interface{} {
+ metrics := b.offsetManager.GetOffsetMetrics()
+
+ return map[string]interface{}{
+ "partition_count": metrics.PartitionCount,
+ "total_offsets": metrics.TotalOffsets,
+ "active_subscriptions": metrics.ActiveSubscriptions,
+ "average_latency": metrics.AverageLatency,
+ }
+}