diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-07-01 14:01:25 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-07-01 14:01:25 -0700 |
| commit | 2420c60fc418638aef936ac3fae5f5b342e7cc42 (patch) | |
| tree | 2593af33fbb140b764b2cb285912a2e71c00eca5 /weed | |
| parent | 067eb15e707378c10b5d897c39bb681110ab268a (diff) | |
| download | seaweedfs-2420c60fc418638aef936ac3fae5f5b342e7cc42.tar.xz seaweedfs-2420c60fc418638aef936ac3fae5f5b342e7cc42.zip | |
log reading adds delay between retries
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 57 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 2 |
2 files changed, 36 insertions, 23 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 18505a95f..624069b7e 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -30,44 +30,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) var processedTsNs int64 - var err error + var readPersistedLogErr error + var readInMemoryLogErr error for { glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) + processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if readPersistedLogErr != nil { + return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) + } else { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + continue + } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() return true }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { - time.Sleep(5127 * time.Millisecond) + if readInMemoryLogErr != nil { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, err) - if err != log_buffer.ResumeError { + glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + if readInMemoryLogErr != log_buffer.ResumeError { break } } - time.Sleep(5127 * time.Millisecond) + time.Sleep(1127 * time.Millisecond) } - return err + return readInMemoryLogErr } @@ -87,41 +92,47 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) var processedTsNs int64 - var err error + var readPersistedLogErr error + var readInMemoryLogErr error for { // println("reading from persisted logs ...") glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) + processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if readPersistedLogErr != nil { + return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) + } else { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + continue + } } glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() return true }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { + if readInMemoryLogErr != nil { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, err) - time.Sleep(3127 * time.Millisecond) - if err != log_buffer.ResumeError { + glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + time.Sleep(1127 * time.Millisecond) + if readInMemoryLogErr != log_buffer.ResumeError { break } } } - return err + return readInMemoryLogErr } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 12840a88a..4742c2b7c 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -170,6 +170,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { m.lastFlushTime = m.stopTime } m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) + m.startTime = time.Unix(0,0) + m.stopTime = time.Unix(0,0) m.pos = 0 m.idx = m.idx[:0] return d |
