diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-23 10:50:28 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-23 10:50:28 -0700 |
| commit | 64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4 (patch) | |
| tree | 30903fa25af0bcf797f93e354f68fb41216dc602 /weed/server/filer_grpc_server_sub_meta.go | |
| parent | 2c8818351f418e3584a6c5410c396f747aebd725 (diff) | |
| download | seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.tar.xz seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.zip | |
metadata subscription uses client epoch
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 82261ca51..8d6dd987f 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,11 +24,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, peerAddress := findClientAddress(stream.Context(), 0) - alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) if alreadyKnown { return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) } - defer fs.deleteClient(clientName, req.ClientId) + defer fs.deleteClient(clientName, req.ClientId, req.ClientEpoch) lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -93,13 +93,13 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata() req.ClientId = -req.ClientId - alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) if alreadyKnown { return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId) } defer func() { glog.V(0).Infof(" - %v local subscribe %s clientId:%d", clientName, req.PathPrefix, req.ClientId) - fs.deleteClient(clientName, req.ClientId) + fs.deleteClient(clientName, req.ClientId, req.ClientEpoch) }() lastReadTime := time.Unix(0, req.SinceNs) @@ -263,25 +263,30 @@ func hasPrefixIn(text string, prefixes []string) bool { return false } -func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32) (alreadyKnown bool, clientName string) { +func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32, clientEpoch int32) (alreadyKnown bool, clientName string) { clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ listener %v", clientName) if clientId != 0 { fs.knownListenersLock.Lock() - _, alreadyKnown = fs.knownListeners[clientId] - if !alreadyKnown { - fs.knownListeners[clientId] = struct{}{} + defer fs.knownListenersLock.Unlock() + epoch, found := fs.knownListeners[clientId] + if !found || epoch < clientEpoch { + fs.knownListeners[clientId] = clientEpoch + } else { + alreadyKnown = true } - fs.knownListenersLock.Unlock() } return } -func (fs *FilerServer) deleteClient(clientName string, clientId int32) { +func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpoch int32) { glog.V(0).Infof("- listener %v", clientName) if clientId != 0 { fs.knownListenersLock.Lock() - delete(fs.knownListeners, clientId) - fs.knownListenersLock.Unlock() + defer fs.knownListenersLock.Unlock() + epoch, found := fs.knownListeners[clientId] + if found && epoch <= clientEpoch { + delete(fs.knownListeners, clientId) + } } } |
