aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-10 01:35:59 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-10 01:35:59 -0700
commitbcf37346ef23d2e31d2baa778ff9f839a3af96c0 (patch)
treeb57452bf94b0d6ddbd6e11c9098b4f72a9e6952a
parent9fa065f600cbb4fbdabcca60e12fb144a4ed97b5 (diff)
downloadseaweedfs-bcf37346ef23d2e31d2baa778ff9f839a3af96c0.tar.xz
seaweedfs-bcf37346ef23d2e31d2baa778ff9f839a3af96c0.zip
add timestamp inside lock
-rw-r--r--weed/filer2/filer_notify.go6
-rw-r--r--weed/queue/log_buffer.go42
2 files changed, 25 insertions, 23 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index 095587038..de07e1cf9 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -45,11 +45,11 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
notification.Queue.SendMessage(fullpath, eventNotification)
}
- f.logMetaEvent(time.Now(), fullpath, eventNotification)
+ f.logMetaEvent(fullpath, eventNotification)
}
-func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *filer_pb.EventNotification) {
+func (f *Filer) logMetaEvent(fullpath string, eventNotification *filer_pb.EventNotification) {
dir, _ := util.FullPath(fullpath).DirAndName()
@@ -63,7 +63,7 @@ func (f *Filer) logMetaEvent(ts time.Time, fullpath string, eventNotification *f
return
}
- f.metaLogBuffer.AddToBuffer(ts, []byte(dir), data)
+ f.metaLogBuffer.AddToBuffer([]byte(dir), data)
}
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
index 5f8db140d..f24429479 100644
--- a/weed/queue/log_buffer.go
+++ b/weed/queue/log_buffer.go
@@ -46,8 +46,18 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb
}
-func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
+func (m *LogBuffer) AddToBuffer(key, data []byte) {
+ m.Lock()
+ defer func() {
+ m.Unlock()
+ if m.notifyFn != nil {
+ m.notifyFn()
+ }
+ }()
+
+ // need to put the timestamp inside the lock
+ ts := time.Now()
logEntry := &filer_pb.LogEntry{
TsNs: ts.UnixNano(),
PartitionKeyHash: util.HashToInt32(key),
@@ -58,14 +68,6 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
size := len(logEntryData)
- m.Lock()
- defer func() {
- m.Unlock()
- if m.notifyFn != nil {
- m.notifyFn()
- }
- }()
-
if m.pos == 0 {
m.startTime = ts
}
@@ -153,18 +155,18 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
l, h := 0, len(m.idx)-1
/*
- for i, pos := range m.idx {
- logEntry, ts := readTs(m.buf, pos)
- event := &filer_pb.FullEventNotification{}
- proto.Unmarshal(logEntry.Data, event)
- entry := event.EventNotification.OldEntry
- if entry == nil {
- entry = event.EventNotification.NewEntry
+ for i, pos := range m.idx {
+ logEntry, ts := readTs(m.buf, pos)
+ event := &filer_pb.FullEventNotification{}
+ proto.Unmarshal(logEntry.Data, event)
+ entry := event.EventNotification.OldEntry
+ if entry == nil {
+ entry = event.EventNotification.NewEntry
+ }
+ fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
}
- fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
- }
- fmt.Printf("l=%d, h=%d\n", l, h)
- */
+ fmt.Printf("l=%d, h=%d\n", l, h)
+ */
for l <= h {
mid := (l + h) / 2