aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-20 00:08:47 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-20 00:08:47 -0700
commitebfab42a5090403e1f61e34c8c6fba731f61fea2 (patch)
tree511f5c7ac0b54f095066a35ea07638fe969da3a9 /weed/server
parent2955b96ef1f05cc395b2625d8b1ff4556e683081 (diff)
downloadseaweedfs-ebfab42a5090403e1f61e34c8c6fba731f61fea2.tar.xz
seaweedfs-ebfab42a5090403e1f61e34c8c6fba731f61fea2.zip
refactoring
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_listen.go85
1 files changed, 49 insertions, 36 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index e3de57145..6dd423007 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -1,9 +1,12 @@
package weed_server
import (
+ "fmt"
"strings"
"time"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -23,48 +26,58 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, req.SinceNs)
}
- var readErr error
- for {
-
- lastReadTime, readErr = fs.filer.ReadLogBuffer(lastReadTime, func(dirPath string, eventNotification *filer_pb.EventNotification) error {
-
- // get complete path to the file or directory
- var entryName string
- if eventNotification.OldEntry != nil {
- entryName = eventNotification.OldEntry.Name
- } else if eventNotification.NewEntry != nil {
- entryName = eventNotification.NewEntry.Name
- }
-
- fullpath := util.Join(dirPath, entryName)
-
- // skip on filer internal meta logs
- if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
- return nil
- }
-
- if !strings.HasPrefix(fullpath, req.PathPrefix) {
- return nil
- }
-
- message := &filer_pb.SubscribeMetadataResponse{
- Directory: dirPath,
- EventNotification: eventNotification,
- }
- if err := stream.Send(message); err != nil {
- return err
- }
+ eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification) error {
+
+ // get complete path to the file or directory
+ var entryName string
+ if eventNotification.OldEntry != nil {
+ entryName = eventNotification.OldEntry.Name
+ } else if eventNotification.NewEntry != nil {
+ entryName = eventNotification.NewEntry.Name
+ }
+
+ fullpath := util.Join(dirPath, entryName)
+
+ // skip on filer internal meta logs
+ if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
return nil
- })
- if readErr != nil {
- glog.V(0).Infof("=> client %v: %+v", clientName, readErr)
- return readErr
}
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ return nil
+ }
+
+ message := &filer_pb.SubscribeMetadataResponse{
+ Directory: dirPath,
+ EventNotification: eventNotification,
+ }
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, err)
+ return err
+ }
+ return nil
+ }
+
+ _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
- }
+ return true
+ }, func(logEntry *filer_pb.LogEntry) error {
+ event := &filer_pb.SubscribeMetadataResponse{}
+ if err := proto.Unmarshal(logEntry.Data, event); err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ }
+
+ if err := eachEventNotificationFn(event.Directory, event.EventNotification); err != nil {
+ return err
+ }
+
+ return nil
+ })
+
+ return err
}