aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-29 17:37:19 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-29 17:37:19 -0700
commitb69cb74c033376d6738ef1537593c2349196bdb6 (patch)
treea894c06765bd8a4f078b042626d3d82cb2857ada
parent063c9ddac5277f4c20489129223f2c49ce51ad3b (diff)
downloadseaweedfs-b69cb74c033376d6738ef1537593c2349196bdb6.tar.xz
seaweedfs-b69cb74c033376d6738ef1537593c2349196bdb6.zip
read meta logs by timestamp
pass in event ts when moving logs meta aggregator reads in memory logs only
-rw-r--r--weed/filer2/filer.go7
-rw-r--r--weed/filer2/filer_notify.go4
-rw-r--r--weed/filer2/meta_aggregator.go8
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go2
-rw-r--r--weed/util/log_buffer/log_buffer.go21
-rw-r--r--weed/util/log_buffer/log_buffer_test.go2
6 files changed, 27 insertions, 17 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index d8929f88f..4c3caec7e 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -16,7 +16,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
-const PaginationSize = 1024 * 256
+const (
+ LogFlushInterval = time.Minute
+ PaginationSize = 1024 * 256
+)
var (
OS_UID = uint32(os.Getuid())
@@ -47,7 +50,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
GrpcDialOption: grpcDialOption,
Signature: util.RandomInt32(),
}
- f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn)
+ f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index 5e6d625e0..19a7e70f0 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -68,7 +68,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
return
}
- f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data)
+ f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
}
@@ -119,7 +119,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
- break
+ continue
}
return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
}
diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go
index 00fcf27d1..f2792bd26 100644
--- a/weed/filer2/meta_aggregator.go
+++ b/weed/filer2/meta_aggregator.go
@@ -25,13 +25,15 @@ type MetaAggregator struct {
ListenersCond *sync.Cond
}
+// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
+// The old data comes from what each LocalMetadata persisted on disk.
func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
t := &MetaAggregator{
filers: filers,
grpcDialOption: grpcDialOption,
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
- t.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, nil, func() {
+ t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
t.ListenersCond.Broadcast()
})
return t
@@ -48,7 +50,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now()
changesSinceLastPersist := 0
- lastTsNs := int64(0)
+ lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
MaxChangeLimit := 100
@@ -88,7 +90,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
}
dir := event.Directory
// println("received meta change", dir, "size", len(data))
- ma.MetaLogBuffer.AddToBuffer([]byte(dir), data)
+ ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index dc11061af..154bf8a44 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -85,7 +85,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
continue
}
- tl.logBuffer.AddToBuffer(in.Data.Key, data)
+ tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
if in.Data.IsClose {
// println("server received closing")
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index d066014d1..e4310b5c5 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb
}
-func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
m.Lock()
defer func() {
@@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}()
// need to put the timestamp inside the lock
- ts := time.Now()
- tsNs := ts.UnixNano()
- if m.lastTsNs >= tsNs {
+ var ts time.Time
+ if eventTsNs == 0 {
+ ts = time.Now()
+ eventTsNs = ts.UnixNano()
+ } else {
+ ts = time.Unix(0, eventTsNs)
+ }
+ if m.lastTsNs >= eventTsNs {
// this is unlikely to happen, but just in case
- tsNs = m.lastTsNs + 1
- ts = time.Unix(0, tsNs)
+ eventTsNs = m.lastTsNs + 1
+ ts = time.Unix(0, eventTsNs)
}
- m.lastTsNs = tsNs
+ m.lastTsNs = eventTsNs
logEntry := &filer_pb.LogEntry{
- TsNs: tsNs,
+ TsNs: eventTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index f9ccc95c2..3d77afb18 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
- lb.AddToBuffer(nil, buf)
+ lb.AddToBuffer(nil, buf, 0)
}
receivedmessageCount := 0