diff options
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index b3abfb67d..5f5c2278f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -34,6 +34,9 @@ type LocalPartition struct { publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient followerGrpcConnection *grpc.ClientConn Follower string + + // Track last activity for idle cleanup + lastActivityTime atomic.Int64 // Unix nano timestamp } var TIME_FORMAT = "2006-01-02-15-04-05" @@ -46,6 +49,7 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log Subscribers: NewLocalPartitionSubscribers(), } lp.ListenersCond = sync.NewCond(&lp.ListenersLock) + lp.lastActivityTime.Store(time.Now().UnixNano()) // Initialize with current time // Ensure a minimum flush interval to prevent busy-loop when set to 0 // A flush interval of 0 would cause time.Sleep(0) creating a CPU-consuming busy loop @@ -65,6 +69,7 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { p.LogBuffer.AddToBuffer(message) + p.UpdateActivity() // Track publish activity for idle cleanup // maybe send to the follower if p.publishFolloweMeStream != nil { @@ -90,11 +95,15 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M var readInMemoryLogErr error var isDone bool + p.UpdateActivity() // Track subscribe activity for idle cleanup + // CRITICAL FIX: Use offset-based functions if startPosition is offset-based // This allows reading historical data by offset, not just by timestamp if startPosition.IsOffsetBased { // Wrap eachMessageFn to match the signature expected by LoopProcessLogDataWithOffset + // Also update activity when messages are processed eachMessageWithOffsetFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + p.UpdateActivity() // Track message read activity return eachMessageFn(logEntry) } @@ -362,3 +371,31 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { // println("notifying", p.Follower, "flushed at", flushTsNs) } } + +// UpdateActivity updates the last activity timestamp for this partition +// Should be called whenever a publisher publishes or a subscriber reads +func (p *LocalPartition) UpdateActivity() { + p.lastActivityTime.Store(time.Now().UnixNano()) +} + +// IsIdle returns true if the partition has no publishers and no subscribers +func (p *LocalPartition) IsIdle() bool { + return p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 +} + +// GetIdleDuration returns how long the partition has been idle +func (p *LocalPartition) GetIdleDuration() time.Duration { + lastActivity := p.lastActivityTime.Load() + return time.Since(time.Unix(0, lastActivity)) +} + +// ShouldCleanup returns true if the partition should be cleaned up +// A partition should be cleaned up if: +// 1. It has no publishers and no subscribers +// 2. It has been idle for longer than the idle timeout +func (p *LocalPartition) ShouldCleanup(idleTimeout time.Duration) bool { + if !p.IsIdle() { + return false + } + return p.GetIdleDuration() > idleTimeout +} |
