diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-20 00:08:47 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-20 00:08:47 -0700 |
| commit | ebfab42a5090403e1f61e34c8c6fba731f61fea2 (patch) | |
| tree | 511f5c7ac0b54f095066a35ea07638fe969da3a9 /weed/server | |
| parent | 2955b96ef1f05cc395b2625d8b1ff4556e683081 (diff) | |
| download | seaweedfs-ebfab42a5090403e1f61e34c8c6fba731f61fea2.tar.xz seaweedfs-ebfab42a5090403e1f61e34c8c6fba731f61fea2.zip | |
refactoring
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_listen.go | 85 |
1 files changed, 49 insertions, 36 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index e3de57145..6dd423007 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -1,9 +1,12 @@ package weed_server import ( + "fmt" "strings" "time" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -23,48 +26,58 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, req.SinceNs) } - var readErr error - for { - - lastReadTime, readErr = fs.filer.ReadLogBuffer(lastReadTime, func(dirPath string, eventNotification *filer_pb.EventNotification) error { - - // get complete path to the file or directory - var entryName string - if eventNotification.OldEntry != nil { - entryName = eventNotification.OldEntry.Name - } else if eventNotification.NewEntry != nil { - entryName = eventNotification.NewEntry.Name - } - - fullpath := util.Join(dirPath, entryName) - - // skip on filer internal meta logs - if strings.HasPrefix(fullpath, filer2.SystemLogDir) { - return nil - } - - if !strings.HasPrefix(fullpath, req.PathPrefix) { - return nil - } - - message := &filer_pb.SubscribeMetadataResponse{ - Directory: dirPath, - EventNotification: eventNotification, - } - if err := stream.Send(message); err != nil { - return err - } + eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification) error { + + // get complete path to the file or directory + var entryName string + if eventNotification.OldEntry != nil { + entryName = eventNotification.OldEntry.Name + } else if eventNotification.NewEntry != nil { + entryName = eventNotification.NewEntry.Name + } + + fullpath := util.Join(dirPath, entryName) + + // skip on filer internal meta logs + if strings.HasPrefix(fullpath, filer2.SystemLogDir) { return nil - }) - if readErr != nil { - glog.V(0).Infof("=> client %v: %+v", clientName, readErr) - return readErr } + if !strings.HasPrefix(fullpath, req.PathPrefix) { + return nil + } + + message := &filer_pb.SubscribeMetadataResponse{ + Directory: dirPath, + EventNotification: eventNotification, + } + if err := stream.Send(message); err != nil { + glog.V(0).Infof("=> client %v: %+v", clientName, err) + return err + } + return nil + } + + _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() - } + return true + }, func(logEntry *filer_pb.LogEntry) error { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + } + + if err := eachEventNotificationFn(event.Directory, event.EventNotification); err != nil { + return err + } + + return nil + }) + + return err } |
