1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
package broker
import (
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)
// OffsetAssignmentFunc is a function type for assigning offsets to messages
type OffsetAssignmentFunc func() (int64, error)
// AddToBufferWithOffset adds a message to the log buffer with offset assignment
// TODO: This is a temporary solution until LogBuffer can be modified to accept offset assignment
// ASSUMPTION: This function will be integrated into LogBuffer.AddToBuffer in the future
func (b *MessageQueueBroker) AddToBufferWithOffset(
logBuffer *log_buffer.LogBuffer,
message *mq_pb.DataMessage,
t topic.Topic,
p topic.Partition,
) error {
// Assign offset for this message
offset, err := b.offsetManager.AssignOffset(t, p)
if err != nil {
return err
}
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
processingTsNs := message.TsNs
if processingTsNs == 0 {
processingTsNs = time.Now().UnixNano()
}
// Create LogEntry with assigned offset
logEntry := &filer_pb.LogEntry{
TsNs: processingTsNs,
PartitionKeyHash: util.HashToInt32(message.Key),
Data: message.Value,
Key: message.Key,
Offset: offset, // Add the assigned offset
}
// Use the existing LogBuffer infrastructure for the rest
// TODO: This is a workaround - ideally LogBuffer should handle offset assignment
// For now, we'll add the message with the pre-assigned offset
return b.addLogEntryToBuffer(logBuffer, logEntry)
}
// addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer
// This is a helper function that directly uses LogBuffer.AddLogEntryToBuffer
func (b *MessageQueueBroker) addLogEntryToBuffer(
logBuffer *log_buffer.LogBuffer,
logEntry *filer_pb.LogEntry,
) error {
// Use the AddLogEntryToBuffer method to preserve offset information
// This ensures the offset is maintained throughout the entire data flow
return logBuffer.AddLogEntryToBuffer(logEntry)
}
// GetPartitionOffsetInfoInternal returns offset information for a partition (internal method)
func (b *MessageQueueBroker) GetPartitionOffsetInfoInternal(t topic.Topic, p topic.Partition) (*PartitionOffsetInfo, error) {
info, err := b.offsetManager.GetPartitionOffsetInfo(t, p)
if err != nil {
return nil, err
}
// CRITICAL FIX: Also check LogBuffer for in-memory messages
// The offset manager only tracks assigned offsets from persistent storage
// But the LogBuffer contains recently written messages that haven't been flushed yet
localPartition := b.localTopicManager.GetLocalPartition(t, p)
logBufferHWM := int64(-1)
if localPartition != nil && localPartition.LogBuffer != nil {
logBufferHWM = localPartition.LogBuffer.GetOffset()
} else {
}
// Use the MAX of offset manager HWM and LogBuffer HWM
// This ensures we report the correct HWM even if data hasn't been flushed to disk yet
// IMPORTANT: Use >= not > because when they're equal, we still want the correct value
highWaterMark := info.HighWaterMark
if logBufferHWM >= 0 && logBufferHWM > highWaterMark {
highWaterMark = logBufferHWM
} else if logBufferHWM >= 0 && logBufferHWM == highWaterMark && highWaterMark > 0 {
} else if logBufferHWM >= 0 {
}
// Latest offset is HWM - 1 (last assigned offset)
latestOffset := highWaterMark - 1
if highWaterMark == 0 {
latestOffset = -1 // No records
}
// Convert to broker-specific format
return &PartitionOffsetInfo{
Topic: t,
Partition: p,
EarliestOffset: info.EarliestOffset,
LatestOffset: latestOffset,
HighWaterMark: highWaterMark,
RecordCount: highWaterMark, // HWM equals record count (offsets 0 to HWM-1)
ActiveSubscriptions: info.ActiveSubscriptions,
}, nil
}
// PartitionOffsetInfo provides offset information for a partition (broker-specific)
type PartitionOffsetInfo struct {
Topic topic.Topic
Partition topic.Partition
EarliestOffset int64
LatestOffset int64
HighWaterMark int64
RecordCount int64
ActiveSubscriptions int64
}
// CreateOffsetSubscription creates an offset-based subscription through the broker
func (b *MessageQueueBroker) CreateOffsetSubscription(
subscriptionID string,
t topic.Topic,
p topic.Partition,
offsetType string, // Will be converted to schema_pb.OffsetType
startOffset int64,
) error {
// TODO: Convert string offsetType to schema_pb.OffsetType
// ASSUMPTION: For now using RESET_TO_EARLIEST as default
// This should be properly mapped based on the offsetType parameter
_, err := b.offsetManager.CreateSubscription(
subscriptionID,
t,
p,
0, // schema_pb.OffsetType_RESET_TO_EARLIEST
startOffset,
)
return err
}
// GetOffsetMetrics returns offset metrics for monitoring
func (b *MessageQueueBroker) GetOffsetMetrics() map[string]interface{} {
metrics := b.offsetManager.GetOffsetMetrics()
return map[string]interface{}{
"partition_count": metrics.PartitionCount,
"total_offsets": metrics.TotalOffsets,
"active_subscriptions": metrics.ActiveSubscriptions,
"average_latency": metrics.AverageLatency,
}
}
|