diff options
Diffstat (limited to 'weed/filer/filer_notify.go')
| -rw-r--r-- | weed/filer/filer_notify.go | 112 |
1 files changed, 22 insertions, 90 deletions
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index db953d398..621d4c227 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "io" - "math" "regexp" "strings" "time" @@ -116,101 +115,34 @@ var ( func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) { - startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute()) - var stopDate, stopHourMinute string - if stopTsNs != 0 { - stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC() - stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day()) - stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute()) - } - - sizeBuf := make([]byte, 4) - startTsNs := startPosition.UnixNano() - - dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "") - if listDayErr != nil { - return lastTsNs, isDone, fmt.Errorf("fail to list log by day: %v", listDayErr) + visitor, visitErr := f.collectPersistedLogBuffer(startPosition, stopTsNs) + if visitErr != nil { + if visitErr == io.EOF { + return + } + err = fmt.Errorf("reading from persisted logs: %v", visitErr) + return } - for _, dayEntry := range dayEntries { - if stopDate != "" { - if strings.Compare(dayEntry.Name(), stopDate) > 0 { + var logEntry *filer_pb.LogEntry + for { + logEntry, visitErr = visitor.GetNext() + if visitErr != nil { + if visitErr == io.EOF { break } + err = fmt.Errorf("read next from persisted logs: %v", visitErr) + return } - // println("checking day", dayEntry.FullPath) - hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "") - if listHourMinuteErr != nil { - return lastTsNs, isDone, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) + isDone, visitErr = eachLogEntryFn(logEntry) + if visitErr != nil { + err = fmt.Errorf("process persisted log entry: %v", visitErr) + return } - for _, hourMinuteEntry := range hourMinuteEntries { - // println("checking hh-mm", hourMinuteEntry.FullPath) - if dayEntry.Name() == startDate { - hourMinute := util.FileNameBase(hourMinuteEntry.Name()) - if strings.Compare(hourMinute, startHourMinute) < 0 { - continue - } - } - if dayEntry.Name() == stopDate { - hourMinute := util.FileNameBase(hourMinuteEntry.Name()) - if strings.Compare(hourMinute, stopHourMinute) > 0 { - break - } - } - // println("processing", hourMinuteEntry.FullPath) - chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.GetChunks()) - if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, stopTsNs, eachLogEntryFn); err != nil { - chunkedFileReader.Close() - if err == io.EOF { - continue - } - if VolumeNotFoundPattern.MatchString(err.Error()) { - glog.Warningf("skipping reading %s: %v", hourMinuteEntry.FullPath, err) - continue - } - return lastTsNs, isDone, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) - } - chunkedFileReader.Close() + lastTsNs = logEntry.TsNs + if isDone { + return } } - return lastTsNs, isDone, nil -} - -func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) { - for { - n, err := r.Read(sizeBuf) - if err != nil { - return lastTsNs, err - } - if n != 4 { - return lastTsNs, fmt.Errorf("size %d bytes, expected 4 bytes", n) - } - size := util.BytesToUint32(sizeBuf) - // println("entry size", size) - entryData := make([]byte, size) - n, err = r.Read(entryData) - if err != nil { - return lastTsNs, err - } - if n != int(size) { - return lastTsNs, fmt.Errorf("entry data %d bytes, expected %d bytes", n, size) - } - logEntry := &filer_pb.LogEntry{} - if err = proto.Unmarshal(entryData, logEntry); err != nil { - return lastTsNs, err - } - if logEntry.TsNs <= startTsNs { - continue - } - if stopTsNs != 0 && logEntry.TsNs > stopTsNs { - return lastTsNs, err - } - // println("each log: ", logEntry.TsNs) - if _, err := eachLogEntryFn(logEntry); err != nil { - return lastTsNs, err - } else { - lastTsNs = logEntry.TsNs - } - } + return } |
