aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filer_notify.go14
-rw-r--r--weed/filer/filer_notify_read.go69
-rw-r--r--weed/filer/stream.go10
3 files changed, 78 insertions, 15 deletions
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 45c9b070f..6fd595f87 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -152,9 +152,21 @@ func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTim
}
var (
- VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
+ volumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
+ chunkNotFoundPattern = regexp.MustCompile(`(urls not found|File Not Found)`)
)
+// isChunkNotFoundError checks if the error indicates that a volume or chunk
+// has been deleted and is no longer available. These errors can be skipped
+// when reading persisted log files since the data is unrecoverable.
+func isChunkNotFoundError(err error) bool {
+ if err == nil {
+ return false
+ }
+ errMsg := err.Error()
+ return volumeNotFoundPattern.MatchString(errMsg) || chunkNotFoundPattern.MatchString(errMsg)
+}
+
func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) {
visitor, visitErr := f.collectPersistedLogBuffer(startPosition, stopTsNs)
diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go
index 62cede687..a46f3fb71 100644
--- a/weed/filer/filer_notify_read.go
+++ b/weed/filer/filer_notify_read.go
@@ -265,11 +265,12 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
// ----------
type LogFileQueueIterator struct {
- q *util.Queue[*LogFileEntry]
- masterClient *wdclient.MasterClient
- startTsNs int64
- stopTsNs int64
- currentFileIterator *LogFileIterator
+ q *util.Queue[*LogFileEntry]
+ masterClient *wdclient.MasterClient
+ startTsNs int64
+ stopTsNs int64
+ pendingEntries []*filer_pb.LogEntry
+ pendingIndex int
}
func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[*LogFileEntry], startTsNs, stopTsNs int64) *LogFileQueueIterator {
@@ -284,13 +285,17 @@ func newLogFileQueueIterator(masterClient *wdclient.MasterClient, q *util.Queue[
// getNext will return io.EOF when done
func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer_pb.LogEntry, err error) {
for {
- if iter.currentFileIterator != nil {
- logEntry, err = iter.currentFileIterator.getNext()
- if err != io.EOF {
- return
- }
+ // return pending entries first
+ if iter.pendingIndex < len(iter.pendingEntries) {
+ logEntry = iter.pendingEntries[iter.pendingIndex]
+ iter.pendingIndex++
+ return logEntry, nil
}
- // now either iter.currentFileIterator is nil or err is io.EOF
+ // reset for next file
+ iter.pendingEntries = nil
+ iter.pendingIndex = 0
+
+ // read entries from next file
if iter.q.Len() == 0 {
return nil, io.EOF
}
@@ -313,7 +318,38 @@ func (iter *LogFileQueueIterator) getNext(v *OrderedLogVisitor) (logEntry *filer
if next != nil && next.TsNs <= iter.startTsNs {
continue
}
- iter.currentFileIterator = newLogFileIterator(iter.masterClient, t.FileEntry, iter.startTsNs, iter.stopTsNs)
+
+ // read all entries from this file
+ iter.pendingEntries, err = iter.readFileEntries(t.FileEntry)
+ if err != nil {
+ return nil, err
+ }
+ }
+}
+
+// readFileEntries reads all log entries from a single file
+func (iter *LogFileQueueIterator) readFileEntries(fileEntry *Entry) (entries []*filer_pb.LogEntry, err error) {
+ fileIterator := newLogFileIterator(iter.masterClient, fileEntry, iter.startTsNs, iter.stopTsNs)
+ defer func() {
+ if closeErr := fileIterator.Close(); closeErr != nil && err == nil {
+ err = closeErr
+ }
+ }()
+
+ for {
+ logEntry, err := fileIterator.getNext()
+ if err == io.EOF {
+ return entries, nil
+ }
+ if err != nil {
+ if isChunkNotFoundError(err) {
+ // Volume or chunk was deleted, skip the rest of this log file
+ glog.Warningf("skipping rest of %s: %v", fileIterator.filePath, err)
+ return entries, nil
+ }
+ return nil, err
+ }
+ entries = append(entries, logEntry)
}
}
@@ -324,6 +360,7 @@ type LogFileIterator struct {
sizeBuf []byte
startTsNs int64
stopTsNs int64
+ filePath string
}
func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
@@ -332,9 +369,17 @@ func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, s
sizeBuf: make([]byte, 4),
startTsNs: startTsNs,
stopTsNs: stopTsNs,
+ filePath: string(fileEntry.FullPath),
}
}
+func (iter *LogFileIterator) Close() error {
+ if r, ok := iter.r.(io.Closer); ok {
+ return r.Close()
+ }
+ return nil
+}
+
// getNext will return io.EOF when done
func (iter *LogFileIterator) getNext() (logEntry *filer_pb.LogEntry, err error) {
var n int
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index b2ee00555..00539ca20 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -245,6 +245,7 @@ type ChunkStreamReader struct {
var _ = io.ReadSeeker(&ChunkStreamReader{})
var _ = io.ReaderAt(&ChunkStreamReader{})
+var _ = io.Closer(&ChunkStreamReader{})
func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
@@ -403,8 +404,13 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return nil
}
-func (c *ChunkStreamReader) Close() {
- // TODO try to release and reuse buffer
+func (c *ChunkStreamReader) Close() error {
+ c.bufferLock.Lock()
+ defer c.bufferLock.Unlock()
+ c.buffer = nil
+ c.head = nil
+ c.chunkView = nil
+ return nil
}
func VolumeId(fileId string) string {