diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub_follow.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_log_buffer_offset.go | 27 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 22 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition_offset.go | 7 |
4 files changed, 34 insertions, 28 deletions
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 117dc4f87..d8f472249 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -53,7 +53,11 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi // TODO: change this to DataMessage // log the message - logBuffer.AddToBuffer(dataMessage) + if addErr := logBuffer.AddToBuffer(dataMessage); addErr != nil { + err = fmt.Errorf("failed to add message to log buffer: %w", addErr) + glog.Errorf("Failed to add message to log buffer: %v", addErr) + break + } // send back the ack if err := stream.Send(&mq_pb.PublishFollowMeResponse{ diff --git a/weed/mq/broker/broker_log_buffer_offset.go b/weed/mq/broker/broker_log_buffer_offset.go index aeb8fad1b..104722af1 100644 --- a/weed/mq/broker/broker_log_buffer_offset.go +++ b/weed/mq/broker/broker_log_buffer_offset.go @@ -8,7 +8,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" - "google.golang.org/protobuf/proto" ) // OffsetAssignmentFunc is a function type for assigning offsets to messages @@ -30,13 +29,9 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( } // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock - var ts time.Time processingTsNs := message.TsNs if processingTsNs == 0 { - ts = time.Now() - processingTsNs = ts.UnixNano() - } else { - ts = time.Unix(0, processingTsNs) + processingTsNs = time.Now().UnixNano() } // Create LogEntry with assigned offset @@ -48,33 +43,21 @@ func (b *MessageQueueBroker) AddToBufferWithOffset( Offset: offset, // Add the assigned offset } - logEntryData, err := proto.Marshal(logEntry) - if err != nil { - return err - } - // Use the existing LogBuffer infrastructure for the rest // TODO: This is a workaround - ideally LogBuffer should handle offset assignment // For now, we'll add the message with the pre-assigned offset - return b.addLogEntryToBuffer(logBuffer, logEntry, logEntryData, ts) + return b.addLogEntryToBuffer(logBuffer, logEntry) } // addLogEntryToBuffer adds a pre-constructed LogEntry to the buffer -// This is a helper function that mimics LogBuffer.AddDataToBuffer but with a pre-built LogEntry +// This is a helper function that directly uses LogBuffer.AddLogEntryToBuffer func (b *MessageQueueBroker) addLogEntryToBuffer( logBuffer *log_buffer.LogBuffer, logEntry *filer_pb.LogEntry, - logEntryData []byte, - ts time.Time, ) error { - // TODO: This is a simplified version of LogBuffer.AddDataToBuffer - // ASSUMPTION: We're bypassing some of the LogBuffer's internal logic - // This should be properly integrated when LogBuffer is modified - - // Use the new AddLogEntryToBuffer method to preserve offset information + // Use the AddLogEntryToBuffer method to preserve offset information // This ensures the offset is maintained throughout the entire data flow - logBuffer.AddLogEntryToBuffer(logEntry) - return nil + return logBuffer.AddLogEntryToBuffer(logEntry) } // GetPartitionOffsetInfoInternal returns offset information for a partition (internal method) diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 5f5c2278f..f03bca2f5 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -68,7 +68,9 @@ func NewLocalPartition(partition Partition, logFlushInterval int, logFlushFn log } func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { - p.LogBuffer.AddToBuffer(message) + if err := p.LogBuffer.AddToBuffer(message); err != nil { + return fmt.Errorf("failed to add message to log buffer: %w", err) + } p.UpdateActivity() // Track publish activity for idle cleanup // maybe send to the follower @@ -107,11 +109,17 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M return eachMessageFn(logEntry) } + // Wrap eachMessageFn for disk reads to also update activity + eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + p.UpdateActivity() // Track disk read activity for idle cleanup + return eachMessageFn(logEntry) + } + // Always attempt initial disk read for historical data // This is fast if no data on disk, and ensures we don't miss old data // The memory read loop below handles new data with instant notifications glog.V(2).Infof("%s reading historical data from disk starting at offset %d", clientName, startPosition.Offset) - processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn) if readPersistedLogErr != nil { glog.V(2).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr @@ -145,7 +153,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M // Read from disk ONCE to catch up, then continue with in-memory buffer if readInMemoryLogErr == log_buffer.ResumeFromDiskError { glog.V(4).Infof("SUBSCRIBE: ResumeFromDiskError - reading flushed data from disk for %s at offset %d", clientName, startPosition.Offset) - processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn) if readPersistedLogErr != nil { glog.V(2).Infof("%s read %v persisted log after flush: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr @@ -175,8 +183,14 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M } // Original timestamp-based subscription logic + // Wrap eachMessageFn for disk reads to also update activity + eachMessageWithActivityFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + p.UpdateActivity() // Track disk read activity for idle cleanup + return eachMessageFn(logEntry) + } + for { - processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) + processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageWithActivityFn) if readPersistedLogErr != nil { glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr diff --git a/weed/mq/topic/local_partition_offset.go b/weed/mq/topic/local_partition_offset.go index e15234ca0..9c8a2dac4 100644 --- a/weed/mq/topic/local_partition_offset.go +++ b/weed/mq/topic/local_partition_offset.go @@ -28,6 +28,9 @@ func (p *LocalPartition) PublishWithOffset(message *mq_pb.DataMessage, assignOff return 0, fmt.Errorf("failed to add message to buffer: %w", err) } + // Track publish activity for idle cleanup (consistent with Publish method) + p.UpdateActivity() + // Send to follower if needed (same logic as original Publish) if p.publishFolloweMeStream != nil { if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ @@ -62,7 +65,9 @@ func (p *LocalPartition) addToBufferWithOffset(message *mq_pb.DataMessage, offse } // Add the entry to the buffer in a way that preserves offset on disk and in-memory - p.LogBuffer.AddLogEntryToBuffer(logEntry) + if err := p.LogBuffer.AddLogEntryToBuffer(logEntry); err != nil { + return fmt.Errorf("failed to add log entry to buffer: %w", err) + } return nil } |
