aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorBruce <half-life@jibudata.com>2024-10-31 23:40:05 +0800
committerGitHub <noreply@github.com>2024-10-31 08:40:05 -0700
commit0060a2cf9ca25f89a252538bf6ca5ac62e6aa65a (patch)
tree167b1596f2f66a9ba0f2a54800f196446da4ae84 /weed/server/filer_grpc_server_sub_meta.go
parentc29c912bdccd60a4de11c382cdab1819197216e6 (diff)
downloadseaweedfs-0060a2cf9ca25f89a252538bf6ca5ac62e6aa65a.tar.xz
seaweedfs-0060a2cf9ca25f89a252538bf6ca5ac62e6aa65a.zip
Fix 6181/6182 (#6183)
* set larger buf size for LogBuffer * jump to next day when no more entry found * Update weed/filer/filer_notify_read.go --------- Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go19
1 files changed, 14 insertions, 5 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 436d6746a..f4c6bfe9d 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,11 +2,12 @@ package weed_server
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/stats"
"strings"
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -62,8 +63,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
+ glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs)
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
+ } else {
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano())
+ position := log_buffer.NewMessagePosition(nextDayTs, -2)
+ found, err := fs.filer.HasPersistedLogFiles(position)
+ if err != nil {
+ return fmt.Errorf("checking persisted log files: %v", err)
+ }
+ if found {
+ lastReadTime = position
+ }
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -72,10 +84,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
- if !fs.hasClient(req.ClientId, req.ClientEpoch) {
- return false
- }
- return true
+ return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {