aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-20 17:26:38 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-20 17:26:38 -0700
commit4bf959edf0195bdd80fc268f795f6e9710e0b269 (patch)
treee904883e8b084a3e35ac0bbb93f39d6495bf8a0a
parentbd43c62fbd659d6a7a058de3b4888e34f7cfd64f (diff)
downloadseaweedfs-4bf959edf0195bdd80fc268f795f6e9710e0b269.tar.xz
seaweedfs-4bf959edf0195bdd80fc268f795f6e9710e0b269.zip
message broker: read also from sealed memory buffer
-rw-r--r--weed/util/log_buffer/log_buffer.go5
-rw-r--r--weed/util/log_buffer/log_buffer_test.go2
-rw-r--r--weed/util/log_buffer/sealed_buffer.go17
3 files changed, 22 insertions, 2 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index d875dd54b..f84a58c74 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -122,6 +122,11 @@ func (m *LogBuffer) loopInterval() {
m.Unlock()
m.flushChan <- toFlush
time.Sleep(m.flushInterval)
+ if m.notifyFn != nil {
+ // check whether blocked clients are already disconnected
+ println("notifying log buffer readers")
+ m.notifyFn()
+ }
}
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index bf1d51703..f9ccc95c2 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -19,7 +19,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
startTime := time.Now()
messageSize := 1024
- messageCount := 100
+ messageCount := 5000
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
index c5160fad0..e412b5f32 100644
--- a/weed/util/log_buffer/sealed_buffer.go
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -4,6 +4,7 @@ import "time"
type MemBuffer struct {
buf []byte
+ size int
startTime time.Time
stopTime time.Time
}
@@ -25,16 +26,30 @@ func newSealedBuffers(size int) *SealedBuffers {
return sbs
}
-func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) {
+func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) {
oldMemBuffer := sbs.buffers[0]
size := len(sbs.buffers)
for i := 0; i < size-1; i++ {
sbs.buffers[i].buf = sbs.buffers[i+1].buf
+ sbs.buffers[i].size = sbs.buffers[i+1].size
sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
}
sbs.buffers[size-1].buf = buf
+ sbs.buffers[size-1].size = pos
sbs.buffers[size-1].startTime = startTime
sbs.buffers[size-1].stopTime = stopTime
return oldMemBuffer.buf
}
+
+func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
+ lastReadTs := lastReadTime.UnixNano()
+ for pos < len(mb.buf) {
+ size, t := readTs(mb.buf, pos)
+ if t > lastReadTs {
+ return
+ }
+ pos += size + 4
+ }
+ return len(mb.buf)
+}