diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-07-05 15:43:06 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-07-05 15:43:06 -0700 |
| commit | 55e40b08fc5f796c8d315b100a9c29dcf3a092e6 (patch) | |
| tree | 9a80958b0693a09e007fc22266669c68a7c55cb1 /weed/server | |
| parent | 881e0fde2e82293c779dfcc8a339122dec8a8ef6 (diff) | |
| download | seaweedfs-55e40b08fc5f796c8d315b100a9c29dcf3a092e6.tar.xz seaweedfs-55e40b08fc5f796c8d315b100a9c29dcf3a092e6.zip | |
refactoring
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 848a1fc3a..d069a45c2 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -23,9 +23,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - var processedTsNs int64 - eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + + eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + + err = fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + + return err + +} + +func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error { + return func(logEntry *filer_pb.LogEntry) error { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + } + + if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { + return err + } + + return nil + } +} + +func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { // get complete path to the file or directory var entryName string @@ -57,40 +97,6 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } return nil } - - eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { - event := &filer_pb.SubscribeMetadataResponse{} - if err := proto.Unmarshal(logEntry.Data, event); err != nil { - glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - } - - if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { - return err - } - - processedTsNs = logEntry.TsNs - - return nil - } - - if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - - err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, eachLogEntryFn) - - return err - } func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) { |
