aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer2/filer.go6
-rw-r--r--weed/filer2/filer_notify.go70
-rw-r--r--weed/queue/log_buffer.go92
3 files changed, 96 insertions, 72 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 428589975..014eb19e9 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/queue"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -35,7 +36,7 @@ type Filer struct {
DirQueuesPath string
buckets *FilerBuckets
Cipher bool
- metaLogBuffer *LogBuffer
+ metaLogBuffer *queue.LogBuffer
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort uint32) *Filer {
@@ -45,7 +46,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, filerGrpcPort ui
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
}
- f.metaLogBuffer = NewLogBuffer(time.Minute, f.logFlushFunc)
+ f.metaLogBuffer = queue.NewLogBuffer(time.Minute, f.logFlushFunc)
go f.loopProcessingDeletion()
@@ -312,5 +313,6 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
}
func (f *Filer) Shutdown() {
+ f.metaLogBuffer.Shutdown()
f.store.Shutdown()
}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index 54c3d0c48..e808e45f0 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -3,7 +3,6 @@ package filer2
import (
"fmt"
"strings"
- "sync"
"time"
"github.com/golang/protobuf/proto"
@@ -78,72 +77,3 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
}
}
-type LogBuffer struct {
- buf []byte
- pos int
- startTime time.Time
- stopTime time.Time
- sizeBuf []byte
- flushInterval time.Duration
- flushFn func(startTime, stopTime time.Time, buf []byte)
- sync.Mutex
-}
-
-func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
- lb := &LogBuffer{
- buf: make([]byte, 4*0124*1024),
- sizeBuf: make([]byte, 4),
- flushInterval: 2 * time.Second, // flushInterval,
- flushFn: flushFn,
- }
- go lb.loopFlush()
- return lb
-}
-
-func (m *LogBuffer) loopFlush() {
- for {
- m.Lock()
- m.flush()
- m.Unlock()
- time.Sleep(m.flushInterval)
- }
-}
-
-func (m *LogBuffer) flush() {
- if m.flushFn != nil && m.pos > 0 {
- m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
- m.pos = 0
- }
-}
-
-func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
-
- logEntry := &filer_pb.LogEntry{
- TsNs: ts.UnixNano(),
- PartitionKeyHash: util.HashToInt32(key),
- Data: data,
- }
-
- logEntryData, _ := proto.Marshal(logEntry)
-
- size := len(logEntryData)
-
- m.Lock()
- defer m.Unlock()
-
- if m.pos == 0 {
- m.startTime = ts
- }
-
- if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
- m.flush()
- m.startTime = ts
- }
- m.stopTime = ts
-
- util.Uint32toBytes(m.sizeBuf, uint32(size))
- copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
-
- copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
- m.pos += size + 4
-}
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
new file mode 100644
index 000000000..d6ccdf2a6
--- /dev/null
+++ b/weed/queue/log_buffer.go
@@ -0,0 +1,92 @@
+package queue
+
+import (
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type LogBuffer struct {
+ buf []byte
+ pos int
+ startTime time.Time
+ stopTime time.Time
+ sizeBuf []byte
+ flushInterval time.Duration
+ flushFn func(startTime, stopTime time.Time, buf []byte)
+ isStopping bool
+ sync.Mutex
+}
+
+func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
+ lb := &LogBuffer{
+ buf: make([]byte, 4*0124*1024),
+ sizeBuf: make([]byte, 4),
+ flushInterval: flushInterval,
+ flushFn: flushFn,
+ }
+ go lb.loopFlush()
+ return lb
+}
+
+func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
+
+ logEntry := &filer_pb.LogEntry{
+ TsNs: ts.UnixNano(),
+ PartitionKeyHash: util.HashToInt32(key),
+ Data: data,
+ }
+
+ logEntryData, _ := proto.Marshal(logEntry)
+
+ size := len(logEntryData)
+
+ m.Lock()
+ defer m.Unlock()
+
+ if m.pos == 0 {
+ m.startTime = ts
+ }
+
+ if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
+ m.flush()
+ m.startTime = ts
+ }
+ m.stopTime = ts
+
+ util.Uint32toBytes(m.sizeBuf, uint32(size))
+ copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
+
+ copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
+ m.pos += size + 4
+}
+
+func (m *LogBuffer) Shutdown() {
+ if m.isStopping {
+ return
+ }
+ m.isStopping = true
+ m.Lock()
+ m.flush()
+ m.Unlock()
+}
+
+func (m *LogBuffer) loopFlush() {
+ for !m.isStopping {
+ m.Lock()
+ m.flush()
+ m.Unlock()
+ time.Sleep(m.flushInterval)
+ }
+}
+
+func (m *LogBuffer) flush() {
+ if m.flushFn != nil && m.pos > 0 {
+ m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
+ m.pos = 0
+ }
+}