diff options
Diffstat (limited to 'weed/mq/topic/local_partition_offset.go')
| -rw-r--r-- | weed/mq/topic/local_partition_offset.go | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go index e15234ca0..9c8a2dac4 100644 --- a/weed/mq/topic/local_partition_offset.go +++ b/weed/mq/topic/local_partition_offset.go @@ -28,6 +28,9 @@ func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOff return 0, fmt.Errorf("failed to add message to buffer: %w", err) } + // Track publish activity for idle cleanup (consistent with Publish method) + p.UpdateActivity() + // Send to follower if needed (same logic as original Publish) if p.publishFolloweMeStream != nil { if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ @@ -62,7 +65,9 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse } // Add the entry to the buffer in a way that preserves offset on disk and in-memory - p.LogBuffer.AddLogEntryToBuffer(logEntry) + if err := p.LogBuffer.AddLogEntryToBuffer(logEntry); err != nil { + return fmt.Errorf("failed to add log entry to buffer: %w", err) + } return nil } |
