aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-02-25 19:54:03 -0800
committerchrislu <chris.lu@gmail.com>2025-02-25 19:54:40 -0800
commitb977e0b3b2f15dda04f55176e74b225e4a73e12b (patch)
treeee6c26f7e15d2ab9e1ef77124b53aa045320c07f /weed/server/filer_grpc_server_sub_meta.go
parent2dc26a064c1c568ecda6d8c3cb53c24ccbb3aaea (diff)
downloadseaweedfs-b977e0b3b2f15dda04f55176e74b225e4a73e12b.tar.xz
seaweedfs-b977e0b3b2f15dda04f55176e74b225e4a73e12b.zip
minor
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go26
1 files changed, 22 insertions, 4 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index f4c6bfe9d..dfe594b46 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "errors"
"fmt"
"strings"
"sync/atomic"
@@ -24,7 +25,8 @@ const (
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
- peerAddress := findClientAddress(stream.Context(), 0)
+ ctx := stream.Context()
+ peerAddress := findClientAddress(ctx, 0)
isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
if isReplacing {
@@ -81,17 +83,24 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ }
+
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
- if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
- if readInMemoryLogErr != log_buffer.ResumeError {
+ if !errors.Is(readInMemoryLogErr, log_buffer.ResumeError) {
break
}
}
@@ -112,7 +121,8 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
- peerAddress := findClientAddress(stream.Context(), 0)
+ ctx := stream.Context()
+ peerAddress := findClientAddress(ctx, 0)
// use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
req.ClientId = -req.ClientId
@@ -165,6 +175,14 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
+
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ }
+
fs.listenersLock.Lock()
atomic.AddInt64(&fs.listenersWaits, 1)
fs.listenersCond.Wait()