diff options
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 436d6746a..f4c6bfe9d 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,11 +2,12 @@ package weed_server import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "strings" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -62,8 +63,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } + glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs) if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) + } else { + nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %v", err) + } + if found { + lastReadTime = position + } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -72,10 +84,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() - if !fs.hasClient(req.ClientId, req.ClientEpoch) { - return false - } - return true + return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { |
