aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_write.go')
-rw-r--r--weed/mq/broker/broker_write.go88
1 files changed, 71 insertions, 17 deletions
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go
index 2711f056b..bdb72a770 100644
--- a/weed/mq/broker/broker_write.go
+++ b/weed/mq/broker/broker_write.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -18,7 +19,13 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error
return b.appendToFileWithBufferIndex(targetFile, data, 0)
}
-func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error {
+func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferOffset int64, offsetArgs ...int64) error {
+ // Extract optional offset parameters (minOffset, maxOffset)
+ var minOffset, maxOffset int64
+ if len(offsetArgs) >= 2 {
+ minOffset = offsetArgs[0]
+ maxOffset = offsetArgs[1]
+ }
fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
if err2 != nil {
@@ -43,45 +50,92 @@ func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data
},
}
- // Add buffer start index for deduplication tracking (binary format)
- if bufferIndex != 0 {
+ // Add buffer start offset for deduplication tracking (binary format)
+ if bufferOffset != 0 {
entry.Extended = make(map[string][]byte)
bufferStartBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
- entry.Extended["buffer_start"] = bufferStartBytes
+ binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset))
+ entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes
+ }
+
+ // Add offset range metadata for Kafka integration
+ if minOffset > 0 && maxOffset >= minOffset {
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ minOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes
+
+ maxOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes
}
} else if err != nil {
return fmt.Errorf("find %s: %v", fullpath, err)
} else {
offset = int64(filer.TotalSize(entry.GetChunks()))
- // Verify buffer index continuity for existing files (append operations)
- if bufferIndex != 0 {
+ // Verify buffer offset continuity for existing files (append operations)
+ if bufferOffset != 0 {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
// Check for existing buffer start (binary format)
- if existingData, exists := entry.Extended["buffer_start"]; exists {
+ if existingData, exists := entry.Extended[mq.ExtendedAttrBufferStart]; exists {
if len(existingData) == 8 {
existingStartIndex := int64(binary.BigEndian.Uint64(existingData))
- // Verify that the new buffer index is consecutive
- // Expected index = start + number of existing chunks
- expectedIndex := existingStartIndex + int64(len(entry.GetChunks()))
- if bufferIndex != expectedIndex {
+ // Verify that the new buffer offset is consecutive
+ // Expected offset = start + number of existing chunks
+ expectedOffset := existingStartIndex + int64(len(entry.GetChunks()))
+ if bufferOffset != expectedOffset {
// This shouldn't happen in normal operation
// Log warning but continue (don't crash the system)
- glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d",
- fullpath, expectedIndex, bufferIndex)
+ glog.Warningf("non-consecutive buffer offset for %s. Expected %d, got %d",
+ fullpath, expectedOffset, bufferOffset)
}
- // Note: We don't update the start index - it stays the same
+ // Note: We don't update the start offset - it stays the same
}
} else {
// No existing buffer start, create new one (shouldn't happen for existing files)
bufferStartBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
- entry.Extended["buffer_start"] = bufferStartBytes
+ binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferOffset))
+ entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes
+ }
+ }
+
+ // Update offset range metadata for existing files
+ if minOffset > 0 && maxOffset >= minOffset {
+ // Update minimum offset if this chunk has a lower minimum
+ if existingMinData, exists := entry.Extended[mq.ExtendedAttrOffsetMin]; exists && len(existingMinData) == 8 {
+ existingMin := int64(binary.BigEndian.Uint64(existingMinData))
+ if minOffset < existingMin {
+ minOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes
+ }
+ } else {
+ // No existing minimum, set it
+ minOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes
+ }
+
+ // Update maximum offset if this chunk has a higher maximum
+ if existingMaxData, exists := entry.Extended[mq.ExtendedAttrOffsetMax]; exists && len(existingMaxData) == 8 {
+ existingMax := int64(binary.BigEndian.Uint64(existingMaxData))
+ if maxOffset > existingMax {
+ maxOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes
+ }
+ } else {
+ // No existing maximum, set it
+ maxOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes
}
}
}