aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go57
-rw-r--r--weed/util/log_buffer/log_buffer.go2
2 files changed, 36 insertions, 23 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 18505a95f..624069b7e 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -30,44 +30,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64
- var err error
+ var readPersistedLogErr error
+ var readInMemoryLogErr error
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
+ processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
+ } else {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
return true
}, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
- time.Sleep(5127 * time.Millisecond)
+ if readInMemoryLogErr != nil {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- if err != log_buffer.ResumeError {
+ glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
- time.Sleep(5127 * time.Millisecond)
+ time.Sleep(1127 * time.Millisecond)
}
- return err
+ return readInMemoryLogErr
}
@@ -87,41 +92,47 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64
- var err error
+ var readPersistedLogErr error
+ var readInMemoryLogErr error
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
+ processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
+ } else {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
}
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
return true
}, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
+ if readInMemoryLogErr != nil {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- time.Sleep(3127 * time.Millisecond)
- if err != log_buffer.ResumeError {
+ glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ time.Sleep(1127 * time.Millisecond)
+ if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
}
- return err
+ return readInMemoryLogErr
}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 12840a88a..4742c2b7c 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -170,6 +170,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
m.lastFlushTime = m.stopTime
}
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
+ m.startTime = time.Unix(0,0)
+ m.stopTime = time.Unix(0,0)
m.pos = 0
m.idx = m.idx[:0]
return d