aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer2/filer_notify.go5
-rw-r--r--weed/queue/log_buffer.go49
2 files changed, 47 insertions, 7 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index 29ba3eead..f85ee1db4 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -81,6 +81,7 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
var buf []byte
newLastReadTime, buf = f.metaLogBuffer.ReadFromBuffer(lastReadTime)
+ var processedTs int64
for pos := 0; pos+4 < len(buf); {
@@ -103,7 +104,10 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
err = eachEventFn(event.Directory, event.EventNotification)
+ processedTs = logEntry.TsNs
+
if err != nil {
+ newLastReadTime = time.Unix(0, processedTs)
return
}
@@ -111,6 +115,7 @@ func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath
}
+ newLastReadTime = time.Unix(0, processedTs)
return
}
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
index 325fcbf48..0a8298be4 100644
--- a/weed/queue/log_buffer.go
+++ b/weed/queue/log_buffer.go
@@ -1,6 +1,8 @@
package queue
import (
+ "fmt"
+ "runtime/debug"
"sync"
"time"
@@ -11,6 +13,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+type dataToFlush struct {
+ startTime time.Time
+ stopTime time.Time
+ data []byte
+}
+
type LogBuffer struct {
buf []byte
idx []int
@@ -22,6 +30,7 @@ type LogBuffer struct {
flushFn func(startTime, stopTime time.Time, buf []byte)
notifyFn func()
isStopping bool
+ flushChan chan *dataToFlush
sync.RWMutex
}
@@ -32,8 +41,10 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
flushInterval: flushInterval,
flushFn: flushFn,
notifyFn: notifyFn,
+ flushChan: make(chan *dataToFlush, 256),
}
go lb.loopFlush()
+ go lb.loopInterval()
return lb
}
@@ -62,7 +73,7 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
}
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
- m.flush()
+ m.flushChan <- m.copyToFlush()
m.startTime = ts
if len(m.buf) < size+4 {
m.buf = make([]byte, 2*size+4)
@@ -83,27 +94,45 @@ func (m *LogBuffer) Shutdown() {
}
m.isStopping = true
m.Lock()
- m.flush()
+ toFlush := m.copyToFlush()
m.Unlock()
+ m.flushChan <- toFlush
+ close(m.flushChan)
}
func (m *LogBuffer) loopFlush() {
+ for d := range m.flushChan {
+ if d != nil {
+ m.flushFn(d.startTime, d.stopTime, d.data)
+ }
+ }
+}
+
+func (m *LogBuffer) loopInterval() {
for !m.isStopping {
m.Lock()
- m.flush()
+ toFlush := m.copyToFlush()
m.Unlock()
+ m.flushChan <- toFlush
time.Sleep(m.flushInterval)
}
}
-func (m *LogBuffer) flush() {
+func (m *LogBuffer) copyToFlush() *dataToFlush {
if m.flushFn != nil && m.pos > 0 {
- // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
- m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
+ fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
+ debug.PrintStack()
+ d := &dataToFlush{
+ startTime: m.startTime,
+ stopTime: m.stopTime,
+ data: copiedBytes(m.buf[:m.pos]),
+ }
m.pos = 0
m.idx = m.idx[:0]
+ return d
}
+ return nil
}
func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
@@ -125,8 +154,14 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
lastTs := lastReadTime.UnixNano()
l, h := 0, len(m.idx)-1
+ /*
+ for i, pos := range m.idx {
+ ts := readTs(m.buf, pos)
+ fmt.Printf("entry %d ts: %v offset:%d\n", i, time.Unix(0,ts), pos)
+ }
+ fmt.Printf("l=%d, h=%d\n", l, h)
+ */
- // fmt.Printf("l=%d, h=%d\n", l, h)
for {
mid := (l + h) / 2
pos := m.idx[mid]