aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer_notify.go112
-rw-r--r--weed/filer/filer_notify_read.go350
-rw-r--r--weed/util/queue.go11
3 files changed, 383 insertions, 90 deletions
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index db953d398..621d4c227 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -5,7 +5,6 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"io"
- "math"
"regexp"
"strings"
"time"
@@ -116,101 +115,34 @@ var (
func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) {
- startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
- var stopDate, stopHourMinute string
- if stopTsNs != 0 {
- stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
- stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
- stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
- }
-
- sizeBuf := make([]byte, 4)
- startTsNs := startPosition.UnixNano()
-
- dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
- if listDayErr != nil {
- return lastTsNs, isDone, fmt.Errorf("fail to list log by day: %v", listDayErr)
+ visitor, visitErr := f.collectPersistedLogBuffer(startPosition, stopTsNs)
+ if visitErr != nil {
+ if visitErr == io.EOF {
+ return
+ }
+ err = fmt.Errorf("reading from persisted logs: %v", visitErr)
+ return
}
- for _, dayEntry := range dayEntries {
- if stopDate != "" {
- if strings.Compare(dayEntry.Name(), stopDate) > 0 {
+ var logEntry *filer_pb.LogEntry
+ for {
+ logEntry, visitErr = visitor.GetNext()
+ if visitErr != nil {
+ if visitErr == io.EOF {
break
}
+ err = fmt.Errorf("read next from persisted logs: %v", visitErr)
+ return
}
- // println("checking day", dayEntry.FullPath)
- hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
- if listHourMinuteErr != nil {
- return lastTsNs, isDone, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
+ isDone, visitErr = eachLogEntryFn(logEntry)
+ if visitErr != nil {
+ err = fmt.Errorf("process persisted log entry: %v", visitErr)
+ return
}
- for _, hourMinuteEntry := range hourMinuteEntries {
- // println("checking hh-mm", hourMinuteEntry.FullPath)
- if dayEntry.Name() == startDate {
- hourMinute := util.FileNameBase(hourMinuteEntry.Name())
- if strings.Compare(hourMinute, startHourMinute) < 0 {
- continue
- }
- }
- if dayEntry.Name() == stopDate {
- hourMinute := util.FileNameBase(hourMinuteEntry.Name())
- if strings.Compare(hourMinute, stopHourMinute) > 0 {
- break
- }
- }
- // println("processing", hourMinuteEntry.FullPath)
- chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.GetChunks())
- if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, stopTsNs, eachLogEntryFn); err != nil {
- chunkedFileReader.Close()
- if err == io.EOF {
- continue
- }
- if VolumeNotFoundPattern.MatchString(err.Error()) {
- glog.Warningf("skipping reading %s: %v", hourMinuteEntry.FullPath, err)
- continue
- }
- return lastTsNs, isDone, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
- }
- chunkedFileReader.Close()
+ lastTsNs = logEntry.TsNs
+ if isDone {
+ return
}
}
- return lastTsNs, isDone, nil
-}
-
-func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) {
- for {
- n, err := r.Read(sizeBuf)
- if err != nil {
- return lastTsNs, err
- }
- if n != 4 {
- return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n)
- }
- size := util.BytesToUint32(sizeBuf)
- // println("entry size", size)
- entryData := make([]byte, size)
- n, err = r.Read(entryData)
- if err != nil {
- return lastTsNs, err
- }
- if n != int(size) {
- return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
- }
- logEntry := &filer_pb.LogEntry{}
- if err = proto.Unmarshal(entryData, logEntry); err != nil {
- return lastTsNs, err
- }
- if logEntry.TsNs <= startTsNs {
- continue
- }
- if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
- return lastTsNs, err
- }
- // println("each log: ", logEntry.TsNs)
- if _, err := eachLogEntryFn(logEntry); err != nil {
- return lastTsNs, err
- } else {
- lastTsNs = logEntry.TsNs
- }
- }
+ return
}
diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go
new file mode 100644
index 000000000..2e75677d7
--- /dev/null
+++ b/weed/filer/filer_notify_read.go
@@ -0,0 +1,350 @@
+package filer
+
+import (
+ "container/heap"
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
+ "google.golang.org/protobuf/proto"
+ "io"
+ "math"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+type LogFileEntry struct {
+ TsNs int64
+ FileEntry *Entry
+}
+
+func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64) (v *OrderedLogVisitor, err error) {
+
+ if stopTsNs != 0 && startPosition.Time.UnixNano() > stopTsNs {
+ return nil, io.EOF
+ }
+
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
+
+ dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
+ if listDayErr != nil {
+ return nil, fmt.Errorf("fail to list log by day: %v", listDayErr)
+ }
+
+ return NewOrderedLogVisitor(f, startPosition, stopTsNs, dayEntries)
+
+}
+
+// ----------
+type LogEntryItem struct {
+ Entry *filer_pb.LogEntry
+ filer string
+}
+
+// LogEntryItemPriorityQueue a priority queue for LogEntry
+type LogEntryItemPriorityQueue []*LogEntryItem
+
+func (pq LogEntryItemPriorityQueue) Len() int { return len(pq) }
+func (pq LogEntryItemPriorityQueue) Less(i, j int) bool {
+ return pq[i].Entry.TsNs < pq[j].Entry.TsNs
+}
+func (pq LogEntryItemPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
+func (pq *LogEntryItemPriorityQueue) Push(x any) {
+ item := x.(*LogEntryItem)
+ *pq = append(*pq, item)
+}
+func (pq *LogEntryItemPriorityQueue) Pop() any {
+ n := len(*pq)
+ item := (*pq)[n-1]
+ *pq = (*pq)[:n-1]
+ return item
+}
+
+// ----------
+
+type OrderedLogVisitor struct {
+ perFilerIteratorMap map[string]*LogFileQueueIterator
+ pq *LogEntryItemPriorityQueue
+ logFileEntryCollector *LogFileEntryCollector
+}
+
+func NewOrderedLogVisitor(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) (*OrderedLogVisitor, error) {
+
+ perFilerQueueMap := make(map[string]*LogFileQueueIterator)
+ // initialize the priority queue
+ pq := &LogEntryItemPriorityQueue{}
+ heap.Init(pq)
+
+ t := &OrderedLogVisitor{
+ perFilerIteratorMap: perFilerQueueMap,
+ pq: pq,
+ logFileEntryCollector: NewLogFileEntryCollector(f, startPosition, stopTsNs, dayEntries),
+ }
+ if err := t.logFileEntryCollector.collectMore(t); err != nil && err != io.EOF {
+ return nil, err
+ }
+ return t, nil
+}
+
+func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) {
+ if o.pq.Len() == 0 {
+ return nil, io.EOF
+ }
+ item := heap.Pop(o.pq).(*LogEntryItem)
+ filerId := item.filer
+
+ // fill the pq with the next log entry from the same filer
+ it := o.perFilerIteratorMap[filerId]
+ next, nextErr := it.getNext(o)
+ if nextErr != nil {
+ if nextErr == io.EOF {
+ // do nothing since the filer has no more log entries
+ }else {
+ return nil, fmt.Errorf("failed to get next log entry: %v", nextErr)
+ }
+ } else {
+ heap.Push(o.pq, &LogEntryItem{
+ Entry: next,
+ filer: filerId,
+ })
+ }
+ return item.Entry, nil
+}
+
+func getFilerId(name string) string {
+ idx := strings.LastIndex(name, ".")
+ if idx < 0 {
+ return ""
+ }
+ return name[idx+1:]
+}
+
+// ----------
+
+type LogFileEntryCollector struct {
+ f *Filer
+ startTsNs int64
+ stopTsNs int64
+ dayEntryQueue *util.Queue[*Entry]
+ startDate string
+ startHourMinute string
+ stopDate string
+ stopHourMinute string
+}
+
+func NewLogFileEntryCollector(f *Filer, startPosition log_buffer.MessagePosition, stopTsNs int64, dayEntries []*Entry) *LogFileEntryCollector {
+ dayEntryQueue := util.NewQueue[*Entry]()
+ for _, dayEntry := range dayEntries {
+ dayEntryQueue.Enqueue(dayEntry)
+ println("enqueue day entry", dayEntry.Name())
+ }
+
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
+ var stopDate, stopHourMinute string
+ if stopTsNs != 0 {
+ stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
+ stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
+ stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
+ }
+
+ return &LogFileEntryCollector{
+ f: f,
+ startTsNs: startPosition.UnixNano(),
+ stopTsNs: stopTsNs,
+ dayEntryQueue: dayEntryQueue,
+ startDate: startDate,
+ startHourMinute: startHourMinute,
+ stopDate: stopDate,
+ stopHourMinute: stopHourMinute,
+ }
+}
+
+func (c *LogFileEntryCollector) hasMore() bool {
+ return c.dayEntryQueue.Len() > 0
+}
+
+func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
+ dayEntry := c.dayEntryQueue.Dequeue()
+ if dayEntry == nil {
+ return io.EOF
+ }
+ println("dequeue day entry", dayEntry.Name())
+ if c.stopDate != "" {
+ if strings.Compare(dayEntry.Name(), c.stopDate) > 0 {
+ return io.EOF
+ }
+ }
+
+ hourMinuteEntries, _, listHourMinuteErr := c.f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
+ if listHourMinuteErr != nil {
+ return fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
+ }
+ freshFilerIds := make(map[string]string)
+ for _, hourMinuteEntry := range hourMinuteEntries {
+ // println("checking hh-mm", hourMinuteEntry.FullPath)
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name())
+ if dayEntry.Name() == c.startDate {
+ if strings.Compare(hourMinute, c.startHourMinute) < 0 {
+ continue
+ }
+ }
+ if dayEntry.Name() == c.stopDate {
+ if strings.Compare(hourMinute, c.stopHourMinute) > 0 {
+ break
+ }
+ }
+
+ tsMinute := fmt.Sprintf("%s-%s", dayEntry.Name(), hourMinute)
+ println(" enqueue", tsMinute)
+ t, parseErr := time.Parse("2006-01-02-15-04", tsMinute)
+ if parseErr != nil {
+ glog.Errorf("failed to parse %s: %v", tsMinute, parseErr)
+ continue
+ }
+ filerId := getFilerId(hourMinuteEntry.Name())
+ iter, found := v.perFilerIteratorMap[filerId]
+ if !found {
+ iter = newLogFileQueueIterator(c.f.MasterClient, util.NewQueue[*LogFileEntry](), c.startTsNs, c.stopTsNs)
+ v.perFilerIteratorMap[filerId] = iter
+ freshFilerIds[filerId] = hourMinuteEntry.Name()
+ }
+ iter.q.Enqueue(&LogFileEntry{
+ TsNs: t.UnixNano(),
+ FileEntry: hourMinuteEntry,
+ })
+ }
+
+ // fill the pq with the next log entry if it is a new filer
+ for filerId, entryName := range freshFilerIds {
+ iter, found := v.perFilerIteratorMap[filerId]
+ if !found {
+ glog.Errorf("Unexpected! failed to find iterator for filer %s", filerId)
+ continue
+ }
+ next, err := iter.getNext(v)
+ if err != nil {
+ if err == io.EOF {
+ // do nothing since the filer has no more log entries
+ }
+ return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err)
+ }
+ heap.Push(v.pq, &LogEntryItem{
+ Entry: next,
+ filer: filerId,
+ })
+ }
+
+ return nil
+}
+
+// ----------
+
+type LogFileQueueIterator struct {
+ q *util.Queue[*LogFileEntry]
+ masterClient *wdclient.MasterClient
+ startTsNs int64
+ stopTsNs int64
+ currentFileIterator *LogFileIterator
+}
+
+func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
+ return &LogFileQueueIterator{
+ q: q,
+ masterClient: masterClient,
+ startTsNs: startTsNs,
+ stopTsNs: stopTsNs,
+ }
+}
+
+// getNext will return io.EOF when done
+func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) {
+ for {
+ if iter.currentFileIterator != nil {
+ logEntry, err = iter.currentFileIterator.getNext()
+ if err != io.EOF {
+ return
+ }
+ }
+ // now either iter.currentFileIterator is nil or err is io.EOF
+ if iter.q.Len() == 0 {
+ return nil, io.EOF
+ }
+ t := iter.q.Dequeue()
+ if t == nil {
+ continue
+ }
+ // skip the file if it is after the stopTsNs
+ if iter.stopTsNs != 0 && t.TsNs > iter.stopTsNs {
+ return nil, io.EOF
+ }
+ next := iter.q.Peek()
+ if next == nil {
+ if collectErr := v.logFileEntryCollector.collectMore(v); collectErr != nil && collectErr != io.EOF {
+ return nil, collectErr
+ }
+ }
+ // skip the file if the next entry is before the startTsNs
+ if next != nil && next.TsNs <= iter.startTsNs {
+ continue
+ }
+ iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
+ }
+}
+
+// ----------
+
+type LogFileIterator struct {
+ r io.Reader
+ sizeBuf []byte
+ startTsNs int64
+ stopTsNs int64
+}
+
+func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
+ return &LogFileIterator{
+ r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
+ sizeBuf: make([]byte, 4),
+ startTsNs: startTsNs,
+ stopTsNs: stopTsNs,
+ }
+}
+
+// getNext will return io.EOF when done
+func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
+ var n int
+ for {
+ n, err = iter.r.Read(iter.sizeBuf)
+ if err != nil {
+ return
+ }
+ if n != 4 {
+ return nil, fmt.Errorf("size %d bytes, expected 4 bytes", n)
+ }
+ size := util.BytesToUint32(iter.sizeBuf)
+ // println("entry size", size)
+ entryData := make([]byte, size)
+ n, err = iter.r.Read(entryData)
+ if err != nil {
+ return
+ }
+ if n != int(size) {
+ return nil, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size)
+ }
+ logEntry = &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ return
+ }
+ if logEntry.TsNs <= iter.startTsNs {
+ continue
+ }
+ if iter.stopTsNs != 0 && logEntry.TsNs > iter.stopTsNs {
+ return nil, io.EOF
+ }
+ return
+ }
+}
diff --git a/weed/util/queue.go b/weed/util/queue.go
index 1437fe8be..69efc078f 100644
--- a/weed/util/queue.go
+++ b/weed/util/queue.go
@@ -61,3 +61,14 @@ func (q *Queue[T]) Dequeue() (result T) {
return n.data
}
+
+func (q *Queue[T]) Peek() (result T) {
+ q.RLock()
+ defer q.RUnlock()
+
+ if q.head == nil {
+ return
+ }
+
+ return q.head.data
+}