aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_log_buffer_offset.go
blob: 104722af10312b0a204343fcf15ca158c8a0aef3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package broker

import (
	"time"

	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)

// OffsetAssignmentFunc is a function type for assigning offsets to messages
type OffsetAssignmentFunc func() (int64, error)

// AddToBufferWithOffset adds a message to the log buffer with offset assignment
// TODO: This is a temporary solution until LogBuffer can be modified to accept offset assignment
// ASSUMPTION: This function will be integrated into LogBuffer.AddToBuffer in the future
func (b *MessageQueueBroker) AddToBufferWithOffset(
	logBuffer *log_buffer.LogBuffer,
	message *mq_pb.DataMessage,
	t topic.Topic,
	p topic.Partition,
) error {
	// Assign offset for this message
	offset, err := b.offsetManager.AssignOffset(t, p)
	if err != nil {
		return err
	}

	// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
	processingTsNs := message.TsNs
	if processingTsNs == 0 {
		processingTsNs = time.Now().UnixNano()
	}

	// Create LogEntry with assigned offset
	logEntry := &filer_pb.LogEntry{
		TsNs:             processingTsNs,
		PartitionKeyHash: util.HashToInt32(message.Key),
		Data:             message.Value,
		Key:              message.Key,
		Offset:           offset, // Add the assigned offset
	}

	// Use the existing LogBuffer infrastructure for the rest
	// TODO: This is a workaround - ideally LogBuffer should handle offset assignment
	// For now, we'll add the message with the pre-assigned offset
	return b.addLogEntryToBuffer(logBuffer, logEntry)
}

// addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer
// This is a helper function that directly uses LogBuffer.AddLogEntryToBuffer
func (b *MessageQueueBroker) addLogEntryToBuffer(
	logBuffer *log_buffer.LogBuffer,
	logEntry *filer_pb.LogEntry,
) error {
	// Use the AddLogEntryToBuffer method to preserve offset information
	// This ensures the offset is maintained throughout the entire data flow
	return logBuffer.AddLogEntryToBuffer(logEntry)
}

// GetPartitionOffsetInfoInternal returns offset information for a partition (internal method)
func (b *MessageQueueBroker) GetPartitionOffsetInfoInternal(t topic.Topic, p topic.Partition) (*PartitionOffsetInfo, error) {
	info, err := b.offsetManager.GetPartitionOffsetInfo(t, p)
	if err != nil {
		return nil, err
	}

	// CRITICAL FIX: Also check LogBuffer for in-memory messages
	// The offset manager only tracks assigned offsets from persistent storage
	// But the LogBuffer contains recently written messages that haven't been flushed yet
	localPartition := b.localTopicManager.GetLocalPartition(t, p)
	logBufferHWM := int64(-1)
	if localPartition != nil && localPartition.LogBuffer != nil {
		logBufferHWM = localPartition.LogBuffer.GetOffset()
	} else {
	}

	// Use the MAX of offset manager HWM and LogBuffer HWM
	// This ensures we report the correct HWM even if data hasn't been flushed to disk yet
	// IMPORTANT: Use >= not > because when they're equal, we still want the correct value
	highWaterMark := info.HighWaterMark
	if logBufferHWM >= 0 && logBufferHWM > highWaterMark {
		highWaterMark = logBufferHWM
	} else if logBufferHWM >= 0 && logBufferHWM == highWaterMark && highWaterMark > 0 {
	} else if logBufferHWM >= 0 {
	}

	// Latest offset is HWM - 1 (last assigned offset)
	latestOffset := highWaterMark - 1
	if highWaterMark == 0 {
		latestOffset = -1 // No records
	}

	// Convert to broker-specific format
	return &PartitionOffsetInfo{
		Topic:               t,
		Partition:           p,
		EarliestOffset:      info.EarliestOffset,
		LatestOffset:        latestOffset,
		HighWaterMark:       highWaterMark,
		RecordCount:         highWaterMark, // HWM equals record count (offsets 0 to HWM-1)
		ActiveSubscriptions: info.ActiveSubscriptions,
	}, nil
}

// PartitionOffsetInfo provides offset information for a partition (broker-specific)
type PartitionOffsetInfo struct {
	Topic               topic.Topic
	Partition           topic.Partition
	EarliestOffset      int64
	LatestOffset        int64
	HighWaterMark       int64
	RecordCount         int64
	ActiveSubscriptions int64
}

// CreateOffsetSubscription creates an offset-based subscription through the broker
func (b *MessageQueueBroker) CreateOffsetSubscription(
	subscriptionID string,
	t topic.Topic,
	p topic.Partition,
	offsetType string, // Will be converted to schema_pb.OffsetType
	startOffset int64,
) error {
	// TODO: Convert string offsetType to schema_pb.OffsetType
	// ASSUMPTION: For now using RESET_TO_EARLIEST as default
	// This should be properly mapped based on the offsetType parameter

	_, err := b.offsetManager.CreateSubscription(
		subscriptionID,
		t,
		p,
		0, // schema_pb.OffsetType_RESET_TO_EARLIEST
		startOffset,
	)

	return err
}

// GetOffsetMetrics returns offset metrics for monitoring
func (b *MessageQueueBroker) GetOffsetMetrics() map[string]interface{} {
	metrics := b.offsetManager.GetOffsetMetrics()

	return map[string]interface{}{
		"partition_count":      metrics.PartitionCount,
		"total_offsets":        metrics.TotalOffsets,
		"active_subscriptions": metrics.ActiveSubscriptions,
		"average_latency":      metrics.AverageLatency,
	}
}