aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go37
1 files changed, 37 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index b3abfb67d..5f5c2278f 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -34,6 +34,9 @@ type LocalPartition struct {
publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn
Follower string
+
+ // Track last activity for idle cleanup
+ lastActivityTime atomic.Int64 // Unix nano timestamp
}
var TIME_FORMAT = "2006-01-02-15-04-05"
@@ -46,6 +49,7 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log
Subscribers: NewLocalPartitionSubscribers(),
}
lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
+ lp.lastActivityTime.Store(time.Now().UnixNano()) // Initialize with current time
// Ensure a minimum flush interval to prevent busy-loop when set to 0
// A flush interval of 0 would cause time.Sleep(0) creating a CPU-consuming busy loop
@@ -65,6 +69,7 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message)
+ p.UpdateActivity() // Track publish activity for idle cleanup
// maybe send to the follower
if p.publishFolloweMeStream != nil {
@@ -90,11 +95,15 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
var readInMemoryLogErr error
var isDone bool
+ p.UpdateActivity() // Track subscribe activity for idle cleanup
+
// CRITICAL FIX: Use offset-based functions if startPosition is offset-based
// This allows reading historical data by offset, not just by timestamp
if startPosition.IsOffsetBased {
// Wrap eachMessageFn to match the signature expected by LoopProcessLogDataWithOffset
+ // Also update activity when messages are processed
eachMessageWithOffsetFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ p.UpdateActivity() // Track message read activity
return eachMessageFn(logEntry)
}
@@ -362,3 +371,31 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
// println("notifying", p.Follower, "flushed at", flushTsNs)
}
}
+
+// UpdateActivity updates the last activity timestamp for this partition
+// Should be called whenever a publisher publishes or a subscriber reads
+func (p *LocalPartition) UpdateActivity() {
+ p.lastActivityTime.Store(time.Now().UnixNano())
+}
+
+// IsIdle returns true if the partition has no publishers and no subscribers
+func (p *LocalPartition) IsIdle() bool {
+ return p.Publishers.Size() == 0 && p.Subscribers.Size() == 0
+}
+
+// GetIdleDuration returns how long the partition has been idle
+func (p *LocalPartition) GetIdleDuration() time.Duration {
+ lastActivity := p.lastActivityTime.Load()
+ return time.Since(time.Unix(0, lastActivity))
+}
+
+// ShouldCleanup returns true if the partition should be cleaned up
+// A partition should be cleaned up if:
+// 1. It has no publishers and no subscribers
+// 2. It has been idle for longer than the idle timeout
+func (p *LocalPartition) ShouldCleanup(idleTimeout time.Duration) bool {
+ if !p.IsIdle() {
+ return false
+ }
+ return p.GetIdleDuration() > idleTimeout
+}