aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_topic_partition_read_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_topic_partition_read_write.go')
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go19
1 files changed, 14 insertions, 5 deletions
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index d6513b2a2..4b0a95217 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -2,13 +2,21 @@ package broker
import (
"fmt"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "sync/atomic"
- "time"
)
+// LogBufferStart tracks the starting buffer index for a live log file
+// Buffer indexes are monotonically increasing, count = number of chunks
+// Now stored in binary format for efficiency
+type LogBufferStart struct {
+ StartIndex int64 // Starting buffer index (count = len(chunks))
+}
+
func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType {
partitionDir := topic.PartitionDir(t, p)
@@ -21,10 +29,11 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
- // TODO append block with more metadata
+ // Get buffer index (now globally unique across restarts)
+ bufferIndex := logBuffer.GetBatchIndex()
for {
- if err := b.appendToFile(targetFile, buf); err != nil {
+ if err := b.appendToFileWithBufferIndex(targetFile, buf, bufferIndex); err != nil {
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
@@ -40,6 +49,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
- glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
+ glog.V(0).Infof("flushing at %d to %s size %d from buffer %s (index %d)", logBuffer.LastFlushTsNs, targetFile, len(buf), logBuffer.GetName(), bufferIndex)
}
}