aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go127
1 files changed, 123 insertions, 4 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index dfe7c410f..b3abfb67d 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -3,12 +3,14 @@ package topic
import (
"context"
"fmt"
+ "strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/grpc"
@@ -37,15 +39,23 @@ type LocalPartition struct {
var TIME_FORMAT = "2006-01-02-15-04-05"
var PartitionGenerationFormat = "v2006-01-02-15-04-05"
-func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
+func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
lp := &LocalPartition{
Partition: partition,
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
+
+ // Ensure a minimum flush interval to prevent busy-loop when set to 0
+ // A flush interval of 0 would cause time.Sleep(0) creating a CPU-consuming busy loop
+ flushInterval := time.Duration(logFlushInterval) * time.Second
+ if flushInterval == 0 {
+ flushInterval = 1 * time.Second // Minimum 1 second to avoid busy-loop, allow near-immediate flushing
+ }
+
lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
- 2*time.Minute, logFlushFn, readFromDiskFn, func() {
+ flushInterval, logFlushFn, readFromDiskFn, func() {
if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
lp.ListenersCond.Broadcast()
}
@@ -80,6 +90,82 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
var readInMemoryLogErr error
var isDone bool
+ // CRITICAL FIX: Use offset-based functions if startPosition is offset-based
+ // This allows reading historical data by offset, not just by timestamp
+ if startPosition.IsOffsetBased {
+ // Wrap eachMessageFn to match the signature expected by LoopProcessLogDataWithOffset
+ eachMessageWithOffsetFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return eachMessageFn(logEntry)
+ }
+
+ // Always attempt initial disk read for historical data
+ // This is fast if no data on disk, and ensures we don't miss old data
+ // The memory read loop below handles new data with instant notifications
+ glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ if readPersistedLogErr != nil {
+ glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
+ return readPersistedLogErr
+ }
+ if isDone {
+ return nil
+ }
+
+ // Update position after reading from disk
+ if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased {
+ startPosition = processedPosition
+ }
+
+ // Step 2: Enter the main loop - read from in-memory buffer, occasionally checking disk
+ for {
+ // Read from in-memory buffer (this is the hot path - handles streaming data)
+ glog.V(4).Infof("SUBSCRIBE: Reading from in-memory buffer for %s at offset %d", clientName, startPosition.Offset)
+ processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogDataWithOffset(clientName, startPosition, 0, onNoMessageFn, eachMessageWithOffsetFn)
+
+ if isDone {
+ return nil
+ }
+
+ // Update position
+ // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead
+ if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased {
+ startPosition = processedPosition
+ }
+
+ // If we get ResumeFromDiskError, it means data was flushed to disk
+ // Read from disk ONCE to catch up, then continue with in-memory buffer
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset)
+ processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
+ if readPersistedLogErr != nil {
+ glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr)
+ return readPersistedLogErr
+ }
+ if isDone {
+ return nil
+ }
+
+ // Update position and continue the loop (back to in-memory buffer)
+ // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead
+ if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased {
+ startPosition = processedPosition
+ }
+ // Loop continues - back to reading from in-memory buffer
+ continue
+ }
+
+ // Any other error is a real error
+ if readInMemoryLogErr != nil {
+ glog.V(2).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
+ return readInMemoryLogErr
+ }
+
+ // If we get here with no error and not done, something is wrong
+ glog.V(1).Infof("SUBSCRIBE: Unexpected state for %s - no error but not done, continuing", clientName)
+ }
+ }
+
+ // Original timestamp-based subscription logic
for {
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
if readPersistedLogErr != nil {
@@ -90,14 +176,16 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
return nil
}
- if processedPosition.Time.UnixNano() != 0 {
+ // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead
+ if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased {
startPosition = processedPosition
}
processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
if isDone {
return nil
}
- if processedPosition.Time.UnixNano() != 0 {
+ // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead
+ if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased {
startPosition = processedPosition
}
@@ -222,6 +310,37 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
return
}
+// MaybeShutdownLocalPartitionForTopic is a topic-aware version that considers system topic retention
+func (p *LocalPartition) MaybeShutdownLocalPartitionForTopic(topicName string) (hasShutdown bool) {
+ // For system topics like _schemas, be more conservative about shutdown
+ if isSystemTopic(topicName) {
+ glog.V(0).Infof("System topic %s - skipping aggressive shutdown for partition %v (Publishers:%d Subscribers:%d)",
+ topicName, p.Partition, p.Publishers.Size(), p.Subscribers.Size())
+ return false
+ }
+
+ // For regular topics, use the standard shutdown logic
+ return p.MaybeShutdownLocalPartition()
+}
+
+// isSystemTopic checks if a topic should have special retention behavior
+func isSystemTopic(topicName string) bool {
+ systemTopics := []string{
+ "_schemas", // Schema Registry topic
+ "__consumer_offsets", // Kafka consumer offsets topic
+ "__transaction_state", // Kafka transaction state topic
+ }
+
+ for _, systemTopic := range systemTopics {
+ if topicName == systemTopic {
+ return true
+ }
+ }
+
+ // Also check for topics with system prefixes
+ return strings.HasPrefix(topicName, "_") || strings.HasPrefix(topicName, "__")
+}
+
func (p *LocalPartition) Shutdown() {
p.closePublishers()
p.closeSubscribers()