diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-01-11 02:08:55 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-01-11 02:08:55 -0800 |
| commit | 394513f598f5fac7889086ab6420d6ccf1b8d53a (patch) | |
| tree | 5561788c7ca45083767c232a10fb8f63e0782733 /weed/server/filer_grpc_server_sub_meta.go | |
| parent | 6e12a3a490fe5bc4f57fccf51a9fbdc9d1675ca5 (diff) | |
| download | seaweedfs-394513f598f5fac7889086ab6420d6ccf1b8d53a.tar.xz seaweedfs-394513f598f5fac7889086ab6420d6ccf1b8d53a.zip | |
filer: ensure seamless meta data updates
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 3b8ced675..d9f91b125 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -29,16 +29,20 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } + var processedTsNs int64 + var err error for { + + processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() @@ -46,6 +50,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return true }, eachLogEntryFn) if err != nil { + if err == log_buffer.ResumeFromDiskError { + continue + } glog.Errorf("processed to %v: %v", lastReadTime, err) time.Sleep(3127 * time.Millisecond) if err != log_buffer.ResumeError { @@ -73,19 +80,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - // println("reading from persisted logs ...") - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + var processedTsNs int64 + var err error - // println("reading from in memory logs ...") for { + // println("reading from persisted logs ...") + processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + // println("reading from in memory logs ...") + lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() @@ -93,6 +104,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq return true }, eachLogEntryFn) if err != nil { + if err == log_buffer.ResumeFromDiskError { + continue + } glog.Errorf("processed to %v: %v", lastReadTime, err) time.Sleep(3127 * time.Millisecond) if err != log_buffer.ResumeError { |
