diff options
| author | chrislu <chris.lu@gmail.com> | 2024-06-13 21:48:57 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-06-13 21:48:57 -0700 |
| commit | 8ffda89ed60526a0162c9de90063dbd208b7a4c7 (patch) | |
| tree | c7364e21e6d036741c9dbc4d7c8bb2e482b3c436 | |
| parent | 6d95f781649fc7b3384b7b2c2ff3fedf011f1297 (diff) | |
| parent | b03c831ad24d67f6dc429c13d7ce07142320a0ac (diff) | |
| download | seaweedfs-8ffda89ed60526a0162c9de90063dbd208b7a4c7.tar.xz seaweedfs-8ffda89ed60526a0162c9de90063dbd208b7a4c7.zip | |
Merge branch 'master' into mq
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 7 |
1 files changed, 6 insertions, 1 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 436c4158f..5deb9d7ca 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -31,7 +31,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } else if alreadyKnown { return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) } - defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) + defer func() { + glog.V(0).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) + fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) + fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting + }() lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -112,6 +116,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq defer func() { glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch) + fs.listenersCond.Broadcast() // nudges the subscribers that are waiting }() lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) |
