aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go6
-rw-r--r--weed/mq/broker/broker_log_buffer_offset.go27
-rw-r--r--weed/mq/topic/local_partition.go22
-rw-r--r--weed/mq/topic/local_partition_offset.go7
4 files changed, 34 insertions, 28 deletions
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 117dc4f87..d8f472249 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -53,7 +53,11 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
// TODO: change this to DataMessage
// log the message
- logBuffer.AddToBuffer(dataMessage)
+ if addErr := logBuffer.AddToBuffer(dataMessage); addErr != nil {
+ err = fmt.Errorf("failed to add message to log buffer: %w", addErr)
+ glog.Errorf("Failed to add message to log buffer: %v", addErr)
+ break
+ }
// send back the ack
if err := stream.Send(&mq_pb.PublishFollowMeResponse{
diff --git a/weed/mq/broker/broker_log_buffer_offset.go b/weed/mq/broker/broker_log_buffer_offset.go
index aeb8fad1b..104722af1 100644
--- a/weed/mq/broker/broker_log_buffer_offset.go
+++ b/weed/mq/broker/broker_log_buffer_offset.go
@@ -8,7 +8,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "google.golang.org/protobuf/proto"
)
// OffsetAssignmentFunc is a function type for assigning offsets to messages
@@ -30,13 +29,9 @@ func (b *MessageQueueBroker) AddToBufferWithOffset(
}
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
- var ts time.Time
processingTsNs := message.TsNs
if processingTsNs == 0 {
- ts = time.Now()
- processingTsNs = ts.UnixNano()
- } else {
- ts = time.Unix(0, processingTsNs)
+ processingTsNs = time.Now().UnixNano()
}
// Create LogEntry with assigned offset
@@ -48,33 +43,21 @@ func (b *MessageQueueBroker) AddToBufferWithOffset(
Offset: offset, // Add the assigned offset
}
- logEntryData, err := proto.Marshal(logEntry)
- if err != nil {
- return err
- }
-
// Use the existing LogBuffer infrastructure for the rest
// TODO: This is a workaround - ideally LogBuffer should handle offset assignment
// For now, we'll add the message with the pre-assigned offset
- return b.addLogEntryToBuffer(logBuffer, logEntry, logEntryData, ts)
+ return b.addLogEntryToBuffer(logBuffer, logEntry)
}
// addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer
-// This is a helper function that mimics LogBuffer.AddDataToBuffer but with a pre-built LogEntry
+// This is a helper function that directly uses LogBuffer.AddLogEntryToBuffer
func (b *MessageQueueBroker) addLogEntryToBuffer(
logBuffer *log_buffer.LogBuffer,
logEntry *filer_pb.LogEntry,
- logEntryData []byte,
- ts time.Time,
) error {
- // TODO: This is a simplified version of LogBuffer.AddDataToBuffer
- // ASSUMPTION: We're bypassing some of the LogBuffer's internal logic
- // This should be properly integrated when LogBuffer is modified
-
- // Use the new AddLogEntryToBuffer method to preserve offset information
+ // Use the AddLogEntryToBuffer method to preserve offset information
// This ensures the offset is maintained throughout the entire data flow
- logBuffer.AddLogEntryToBuffer(logEntry)
- return nil
+ return logBuffer.AddLogEntryToBuffer(logEntry)
}
// GetPartitionOffsetInfoInternal returns offset information for a partition (internal method)
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 5f5c2278f..f03bca2f5 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -68,7 +68,9 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
- p.LogBuffer.AddToBuffer(message)
+ if err := p.LogBuffer.AddToBuffer(message); err != nil {
+ return fmt.Errorf("failed to add message to log buffer: %w", err)
+ }
p.UpdateActivity() // Track publish activity for idle cleanup
// maybe send to the follower
@@ -107,11 +109,17 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
return eachMessageFn(logEntry)
}
+ // Wrap eachMessageFn for disk reads to also update activity
+ eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ p.UpdateActivity() // Track disk read activity for idle cleanup
+ return eachMessageFn(logEntry)
+ }
+
// Always attempt initial disk read for historical data
// This is fast if no data on disk, and ensures we don't miss old data
// The memory read loop below handles new data with instant notifications
glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset)
- processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil {
glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
@@ -145,7 +153,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
// Read from disk ONCE to catch up, then continue with in-memory buffer
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset)
- processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil {
glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
@@ -175,8 +183,14 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
}
// Original timestamp-based subscription logic
+ // Wrap eachMessageFn for disk reads to also update activity
+ eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ p.UpdateActivity() // Track disk read activity for idle cleanup
+ return eachMessageFn(logEntry)
+ }
+
for {
- processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil {
glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go
index e15234ca0..9c8a2dac4 100644
--- a/weed/mq/topic/local_partition_offset.go
+++ b/weed/mq/topic/local_partition_offset.go
@@ -28,6 +28,9 @@ func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOff
return 0, fmt.Errorf("failed to add message to buffer: %w", err)
}
+ // Track publish activity for idle cleanup (consistent with Publish method)
+ p.UpdateActivity()
+
// Send to follower if needed (same logic as original Publish)
if p.publishFolloweMeStream != nil {
if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
@@ -62,7 +65,9 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse
}
// Add the entry to the buffer in a way that preserves offset on disk and in-memory
- p.LogBuffer.AddLogEntryToBuffer(logEntry)
+ if err := p.LogBuffer.AddLogEntryToBuffer(logEntry); err != nil {
+ return fmt.Errorf("failed to add log entry to buffer: %w", err)
+ }
return nil
}