diff options
Diffstat (limited to 'weed/mq/broker/broker_write.go')
| -rw-r--r-- | weed/mq/broker/broker_write.go | 88 |
1 files changed, 71 insertions, 17 deletions
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 2711f056b..bdb72a770 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -18,7 +19,13 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error return b.appendToFileWithBufferIndex(targetFile, data, 0) } -func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error { +func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferOffset int64, offsetArgs ...int64) error { + // Extract optional offset parameters (minOffset, maxOffset) + var minOffset, maxOffset int64 + if len(offsetArgs) >= 2 { + minOffset = offsetArgs[0] + maxOffset = offsetArgs[1] + } fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data) if err2 != nil { @@ -43,45 +50,92 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data }, } - // Add buffer start index for deduplication tracking (binary format) - if bufferIndex != 0 { + // Add buffer start offset for deduplication tracking (binary format) + if bufferOffset != 0 { entry.Extended = make(map[string][]byte) bufferStartBytes := make([]byte, 8) - binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex)) - entry.Extended["buffer_start"] = bufferStartBytes + binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset)) + entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes + } + + // Add offset range metadata for Kafka integration + if minOffset > 0 && maxOffset >= minOffset { + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + minOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset)) + entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes + + maxOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset)) + entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes } } else if err != nil { return fmt.Errorf("find %s: %v", fullpath, err) } else { offset = int64(filer.TotalSize(entry.GetChunks())) - // Verify buffer index continuity for existing files (append operations) - if bufferIndex != 0 { + // Verify buffer offset continuity for existing files (append operations) + if bufferOffset != 0 { if entry.Extended == nil { entry.Extended = make(map[string][]byte) } // Check for existing buffer start (binary format) - if existingData, exists := entry.Extended["buffer_start"]; exists { + if existingData, exists := entry.Extended[mq.ExtendedAttrBufferStart]; exists { if len(existingData) == 8 { existingStartIndex := int64(binary.BigEndian.Uint64(existingData)) - // Verify that the new buffer index is consecutive - // Expected index = start + number of existing chunks - expectedIndex := existingStartIndex + int64(len(entry.GetChunks())) - if bufferIndex != expectedIndex { + // Verify that the new buffer offset is consecutive + // Expected offset = start + number of existing chunks + expectedOffset := existingStartIndex + int64(len(entry.GetChunks())) + if bufferOffset != expectedOffset { // This shouldn't happen in normal operation // Log warning but continue (don't crash the system) - glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d", - fullpath, expectedIndex, bufferIndex) + glog.Warningf("non-consecutive buffer offset for %s. Expected %d, got %d", + fullpath, expectedOffset, bufferOffset) } - // Note: We don't update the start index - it stays the same + // Note: We don't update the start offset - it stays the same } } else { // No existing buffer start, create new one (shouldn't happen for existing files) bufferStartBytes := make([]byte, 8) - binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex)) - entry.Extended["buffer_start"] = bufferStartBytes + binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset)) + entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes + } + } + + // Update offset range metadata for existing files + if minOffset > 0 && maxOffset >= minOffset { + // Update minimum offset if this chunk has a lower minimum + if existingMinData, exists := entry.Extended[mq.ExtendedAttrOffsetMin]; exists && len(existingMinData) == 8 { + existingMin := int64(binary.BigEndian.Uint64(existingMinData)) + if minOffset < existingMin { + minOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset)) + entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes + } + } else { + // No existing minimum, set it + minOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset)) + entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes + } + + // Update maximum offset if this chunk has a higher maximum + if existingMaxData, exists := entry.Extended[mq.ExtendedAttrOffsetMax]; exists && len(existingMaxData) == 8 { + existingMax := int64(binary.BigEndian.Uint64(existingMaxData)) + if maxOffset > existingMax { + maxOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset)) + entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes + } + } else { + // No existing maximum, set it + maxOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset)) + entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes } } } |
