diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 29 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 4 |
2 files changed, 19 insertions, 14 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 82261ca51..8d6dd987f 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,11 +24,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, peerAddress := findClientAddress(stream.Context(), 0) - alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) if alreadyKnown { return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) } - defer fs.deleteClient(clientName, req.ClientId) + defer fs.deleteClient(clientName, req.ClientId, req.ClientEpoch) lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -93,13 +93,13 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata() req.ClientId = -req.ClientId - alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId, req.ClientEpoch) if alreadyKnown { return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId) } defer func() { glog.V(0).Infof(" - %v local subscribe %s clientId:%d", clientName, req.PathPrefix, req.ClientId) - fs.deleteClient(clientName, req.ClientId) + fs.deleteClient(clientName, req.ClientId, req.ClientEpoch) }() lastReadTime := time.Unix(0, req.SinceNs) @@ -263,25 +263,30 @@ func hasPrefixIn(text string, prefixes []string) bool { return false } -func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32) (alreadyKnown bool, clientName string) { +func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32, clientEpoch int32) (alreadyKnown bool, clientName string) { clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ listener %v", clientName) if clientId != 0 { fs.knownListenersLock.Lock() - _, alreadyKnown = fs.knownListeners[clientId] - if !alreadyKnown { - fs.knownListeners[clientId] = struct{}{} + defer fs.knownListenersLock.Unlock() + epoch, found := fs.knownListeners[clientId] + if !found || epoch < clientEpoch { + fs.knownListeners[clientId] = clientEpoch + } else { + alreadyKnown = true } - fs.knownListenersLock.Unlock() } return } -func (fs *FilerServer) deleteClient(clientName string, clientId int32) { +func (fs *FilerServer) deleteClient(clientName string, clientId int32, clientEpoch int32) { glog.V(0).Infof("- listener %v", clientName) if clientId != 0 { fs.knownListenersLock.Lock() - delete(fs.knownListeners, clientId) - fs.knownListenersLock.Unlock() + defer fs.knownListenersLock.Unlock() + epoch, found := fs.knownListeners[clientId] + if found && epoch <= clientEpoch { + delete(fs.knownListeners, clientId) + } } } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 6bf0261ee..6ccb33996 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -88,7 +88,7 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex - knownListeners map[int32]struct{} + knownListeners map[int32]int32 brokers map[string]map[string]bool brokersLock sync.Mutex @@ -111,7 +111,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), - knownListeners: make(map[int32]struct{}), + knownListeners: make(map[int32]int32), brokers: make(map[string]map[string]bool), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } |
