aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go22
1 files changed, 18 insertions, 4 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 5f5c2278f..f03bca2f5 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -68,7 +68,9 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
- p.LogBuffer.AddToBuffer(message)
+ if err := p.LogBuffer.AddToBuffer(message); err != nil {
+ return fmt.Errorf("failed to add message to log buffer: %w", err)
+ }
p.UpdateActivity() // Track publish activity for idle cleanup
// maybe send to the follower
@@ -107,11 +109,17 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
return eachMessageFn(logEntry)
}
+ // Wrap eachMessageFn for disk reads to also update activity
+ eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ p.UpdateActivity() // Track disk read activity for idle cleanup
+ return eachMessageFn(logEntry)
+ }
+
// Always attempt initial disk read for historical data
// This is fast if no data on disk, and ensures we don't miss old data
// The memory read loop below handles new data with instant notifications
glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset)
- processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil {
glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
@@ -145,7 +153,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
// Read from disk ONCE to catch up, then continue with in-memory buffer
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset)
- processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil {
glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
@@ -175,8 +183,14 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
}
// Original timestamp-based subscription logic
+ // Wrap eachMessageFn for disk reads to also update activity
+ eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ p.UpdateActivity() // Track disk read activity for idle cleanup
+ return eachMessageFn(logEntry)
+ }
+
for {
- processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn)
if readPersistedLogErr != nil {
glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr