diff options
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 127 |
1 files changed, 123 insertions, 4 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index dfe7c410f..b3abfb67d 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -3,12 +3,14 @@ package topic import ( "context" "fmt" + "strings" "sync" "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/grpc" @@ -37,15 +39,23 @@ type LocalPartition struct { var TIME_FORMAT = "2006-01-02-15-04-05" var PartitionGenerationFormat = "v2006-01-02-15-04-05" -func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { +func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { lp := &LocalPartition{ Partition: partition, Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), } lp.ListenersCond = sync.NewCond(&lp.ListenersLock) + + // Ensure a minimum flush interval to prevent busy-loop when set to 0 + // A flush interval of 0 would cause time.Sleep(0) creating a CPU-consuming busy loop + flushInterval := time.Duration(logFlushInterval) * time.Second + if flushInterval == 0 { + flushInterval = 1 * time.Second // Minimum 1 second to avoid busy-loop, allow near-immediate flushing + } + lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), - 2*time.Minute, logFlushFn, readFromDiskFn, func() { + flushInterval, logFlushFn, readFromDiskFn, func() { if atomic.LoadInt64(&lp.ListenersWaits) > 0 { lp.ListenersCond.Broadcast() } @@ -80,6 +90,82 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M var readInMemoryLogErr error var isDone bool + // CRITICAL FIX: Use offset-based functions if startPosition is offset-based + // This allows reading historical data by offset, not just by timestamp + if startPosition.IsOffsetBased { + // Wrap eachMessageFn to match the signature expected by LoopProcessLogDataWithOffset + eachMessageWithOffsetFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + return eachMessageFn(logEntry) + } + + // Always attempt initial disk read for historical data + // This is fast if no data on disk, and ensures we don't miss old data + // The memory read loop below handles new data with instant notifications + glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + if readPersistedLogErr != nil { + glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) + return readPersistedLogErr + } + if isDone { + return nil + } + + // Update position after reading from disk + if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased { + startPosition = processedPosition + } + + // Step 2: Enter the main loop - read from in-memory buffer, occasionally checking disk + for { + // Read from in-memory buffer (this is the hot path - handles streaming data) + glog.V(4).Infof("SUBSCRIBE: Reading from in-memory buffer for %s at offset %d", clientName, startPosition.Offset) + processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogDataWithOffset(clientName, startPosition, 0, onNoMessageFn, eachMessageWithOffsetFn) + + if isDone { + return nil + } + + // Update position + // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead + if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased { + startPosition = processedPosition + } + + // If we get ResumeFromDiskError, it means data was flushed to disk + // Read from disk ONCE to catch up, then continue with in-memory buffer + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + if readPersistedLogErr != nil { + glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr) + return readPersistedLogErr + } + if isDone { + return nil + } + + // Update position and continue the loop (back to in-memory buffer) + // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead + if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased { + startPosition = processedPosition + } + // Loop continues - back to reading from in-memory buffer + continue + } + + // Any other error is a real error + if readInMemoryLogErr != nil { + glog.V(2).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) + return readInMemoryLogErr + } + + // If we get here with no error and not done, something is wrong + glog.V(1).Infof("SUBSCRIBE: Unexpected state for %s - no error but not done, continuing", clientName) + } + } + + // Original timestamp-based subscription logic for { processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) if readPersistedLogErr != nil { @@ -90,14 +176,16 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M return nil } - if processedPosition.Time.UnixNano() != 0 { + // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead + if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased { startPosition = processedPosition } processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn) if isDone { return nil } - if processedPosition.Time.UnixNano() != 0 { + // CRITICAL FIX: For offset-based reads, Time is zero, so check Offset instead + if processedPosition.Time.UnixNano() != 0 || processedPosition.IsOffsetBased { startPosition = processedPosition } @@ -222,6 +310,37 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { return } +// MaybeShutdownLocalPartitionForTopic is a topic-aware version that considers system topic retention +func (p *LocalPartition) MaybeShutdownLocalPartitionForTopic(topicName string) (hasShutdown bool) { + // For system topics like _schemas, be more conservative about shutdown + if isSystemTopic(topicName) { + glog.V(0).Infof("System topic %s - skipping aggressive shutdown for partition %v (Publishers:%d Subscribers:%d)", + topicName, p.Partition, p.Publishers.Size(), p.Subscribers.Size()) + return false + } + + // For regular topics, use the standard shutdown logic + return p.MaybeShutdownLocalPartition() +} + +// isSystemTopic checks if a topic should have special retention behavior +func isSystemTopic(topicName string) bool { + systemTopics := []string{ + "_schemas", // Schema Registry topic + "__consumer_offsets", // Kafka consumer offsets topic + "__transaction_state", // Kafka transaction state topic + } + + for _, systemTopic := range systemTopics { + if topicName == systemTopic { + return true + } + } + + // Also check for topics with system prefixes + return strings.HasPrefix(topicName, "_") || strings.HasPrefix(topicName, "__") +} + func (p *LocalPartition) Shutdown() { p.closePublishers() p.closeSubscribers() |
