aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-10-13 16:48:14 -0700
committerchrislu <chris.lu@gmail.com>2023-10-13 16:48:14 -0700
commitcbc24c7b243282a6d72ac8a948fcc3122ee82ddb (patch)
treead7abb026526397167567a3a27afce9f0fea77d4 /weed/server/filer_grpc_server_sub_meta.go
parent6acb43bbbb9b15c938d246837a17dca66393409e (diff)
downloadseaweedfs-cbc24c7b243282a6d72ac8a948fcc3122ee82ddb.tar.xz
seaweedfs-cbc24c7b243282a6d72ac8a948fcc3122ee82ddb.zip
disconnect from old subscribers
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go52
1 files changed, 41 insertions, 11 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index bd24ab54f..eb69a6aeb 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -24,11 +24,13 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
peerAddress := findClientAddress(stream.Context(), 0)
- alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
- if alreadyKnown {
+ isReplacing, alreadyKnown, clientName := fs.addClient("", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
+ if isReplacing {
+ fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
+ } else if alreadyKnown {
return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
}
- defer fs.deleteClient(clientName, req.ClientId, req.ClientEpoch)
+ defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -64,6 +66,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
+ if !fs.hasClient(req.ClientId, req.ClientEpoch) {
+ return false
+ }
return true
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
@@ -78,6 +83,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
if isDone {
return nil
}
+ if !fs.hasClient(req.ClientId, req.ClientEpoch) {
+ glog.V(0).Infof("client %v is closed", clientName)
+ return nil
+ }
time.Sleep(1127 * time.Millisecond)
}
@@ -93,13 +102,15 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
// use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
req.ClientId = -req.ClientId
- alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
- if alreadyKnown {
+ isReplacing, alreadyKnown, clientName := fs.addClient("local", req.ClientName, peerAddress, req.ClientId, req.ClientEpoch)
+ if isReplacing {
+ fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
+ } else if alreadyKnown {
return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
}
defer func() {
- glog.V(0).Infof(" - %v local subscribe %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
- fs.deleteClient(clientName, req.ClientId, req.ClientEpoch)
+ glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
+ fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
}()
lastReadTime := time.Unix(0, req.SinceNs)
@@ -141,6 +152,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
+ if !fs.hasClient(req.ClientId, req.ClientEpoch) {
+ return false
+ }
return true
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
@@ -155,6 +169,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
if isDone {
return nil
}
+ if !fs.hasClient(req.ClientId, req.ClientEpoch) {
+ return nil
+ }
}
return readInMemoryLogErr
@@ -274,15 +291,16 @@ func matchByDirectory(dirPath string, directories []string) bool {
return false
}
-func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32, clientEpoch int32) (alreadyKnown bool, clientName string) {
+func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) {
clientName = clientType + "@" + clientAddress
- glog.V(0).Infof("+ listener %v", clientName)
+ glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
if clientId != 0 {
fs.knownListenersLock.Lock()
defer fs.knownListenersLock.Unlock()
epoch, found := fs.knownListeners[clientId]
if !found || epoch < clientEpoch {
fs.knownListeners[clientId] = clientEpoch
+ isReplacing = true
} else {
alreadyKnown = true
}
@@ -290,8 +308,8 @@ func (fs *FilerServer) addClient(clientType string, clientAddress string, client
return
}
-func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpoch int32) {
- glog.V(0).Infof("- listener %v", clientName)
+func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) {
+ glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
if clientId != 0 {
fs.knownListenersLock.Lock()
defer fs.knownListenersLock.Unlock()
@@ -301,3 +319,15 @@ func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpo
}
}
}
+
+func (fs *FilerServer) hasClient(clientId int32, clientEpoch int32) bool {
+ if clientId != 0 {
+ fs.knownListenersLock.Lock()
+ defer fs.knownListenersLock.Unlock()
+ epoch, found := fs.knownListeners[clientId]
+ if found && epoch <= clientEpoch {
+ return true
+ }
+ }
+ return false
+}