diff options
| author | Bruce <half-life@jibudata.com> | 2024-10-31 23:40:05 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-31 08:40:05 -0700 |
| commit | 0060a2cf9ca25f89a252538bf6ca5ac62e6aa65a (patch) | |
| tree | 167b1596f2f66a9ba0f2a54800f196446da4ae84 /weed/server/filer_grpc_server_sub_meta.go | |
| parent | c29c912bdccd60a4de11c382cdab1819197216e6 (diff) | |
| download | seaweedfs-0060a2cf9ca25f89a252538bf6ca5ac62e6aa65a.tar.xz seaweedfs-0060a2cf9ca25f89a252538bf6ca5ac62e6aa65a.zip | |
Fix 6181/6182 (#6183)
* set larger buf size for LogBuffer
* jump to next day when no more entry found
* Update weed/filer/filer_notify_read.go
---------
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 436d6746a..f4c6bfe9d 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,11 +2,12 @@ package weed_server import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "strings" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -62,8 +63,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } + glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs) if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) + } else { + nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %v", err) + } + if found { + lastReadTime = position + } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -72,10 +84,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() - if !fs.hasClient(req.ClientId, req.ClientEpoch) { - return false - } - return true + return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { |
