diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-07-05 15:50:07 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-07-05 15:50:07 -0700 |
| commit | 70d8a3a1d3395ece4a37c5aac1249293b57d5975 (patch) | |
| tree | aad0a083ecf06b6462cd8ed20e0629f75ad69577 /weed/server | |
| parent | 55e40b08fc5f796c8d315b100a9c29dcf3a092e6 (diff) | |
| download | seaweedfs-70d8a3a1d3395ece4a37c5aac1249293b57d5975.tar.xz seaweedfs-70d8a3a1d3395ece4a37c5aac1249293b57d5975.zip | |
add SubscribeLocalMetadata without checking persisted meta logs
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index d069a45c2..c82a58b16 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -48,6 +48,32 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } +func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error { + + peerAddress := findClientAddress(stream.Context(), 0) + + clientName := fs.addClient(req.ClientName, peerAddress) + + defer fs.deleteClient(clientName) + + lastReadTime := time.Unix(0, req.SinceNs) + glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + + eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + + err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + + return err + +} + func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error { return func(logEntry *filer_pb.LogEntry) error { event := &filer_pb.SubscribeMetadataResponse{} |
