aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition_offset.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition_offset.go')
-rw-r--r--weed/mq/topic/local_partition_offset.go7
1 files changed, 6 insertions, 1 deletions
diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go
index e15234ca0..9c8a2dac4 100644
--- a/weed/mq/topic/local_partition_offset.go
+++ b/weed/mq/topic/local_partition_offset.go
@@ -28,6 +28,9 @@ func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOff
return 0, fmt.Errorf("failed to add message to buffer: %w", err)
}
+ // Track publish activity for idle cleanup (consistent with Publish method)
+ p.UpdateActivity()
+
// Send to follower if needed (same logic as original Publish)
if p.publishFolloweMeStream != nil {
if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
@@ -62,7 +65,9 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse
}
// Add the entry to the buffer in a way that preserves offset on disk and in-memory
- p.LogBuffer.AddLogEntryToBuffer(logEntry)
+ if err := p.LogBuffer.AddLogEntryToBuffer(logEntry); err != nil {
+ return fmt.Errorf("failed to add log entry to buffer: %w", err)
+ }
return nil
}