aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-20 00:08:47 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-20 00:08:47 -0700
commitebfab42a5090403e1f61e34c8c6fba731f61fea2 (patch)
tree511f5c7ac0b54f095066a35ea07638fe969da3a9
parent2955b96ef1f05cc395b2625d8b1ff4556e683081 (diff)
downloadseaweedfs-ebfab42a5090403e1f61e34c8c6fba731f61fea2.tar.xz
seaweedfs-ebfab42a5090403e1f61e34c8c6fba731f61fea2.zip
refactoring
-rw-r--r--weed/filer2/filer_notify.go49
-rw-r--r--weed/server/filer_grpc_server_listen.go85
2 files changed, 49 insertions, 85 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index ac7ae4e4c..e5356472a 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -1,7 +1,6 @@
package filer2
import (
- "bytes"
"fmt"
"strings"
"time"
@@ -79,51 +78,3 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
glog.V(0).Infof("log write failed %s: %v", targetFile, err)
}
}
-
-func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath string, eventNotification *filer_pb.EventNotification) error) (newLastReadTime time.Time, err error) {
-
- var bytesBuf *bytes.Buffer
- bytesBuf = f.MetaLogBuffer.ReadFromBuffer(lastReadTime)
- if bytesBuf == nil {
- return
- }
- defer f.MetaLogBuffer.ReleaseMeory(bytesBuf)
- buf := bytesBuf.Bytes()
- var processedTs int64
-
- for pos := 0; pos+4 < len(buf); {
-
- size := util.BytesToUint32(buf[pos : pos+4])
- entryData := buf[pos+4 : pos+4+int(size)]
-
- logEntry := &filer_pb.LogEntry{}
- err = proto.Unmarshal(entryData, logEntry)
- if err != nil {
- glog.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err)
- return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err)
- }
-
- event := &filer_pb.SubscribeMetadataResponse{}
- err = proto.Unmarshal(logEntry.Data, event)
- if err != nil {
- glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
- return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
- }
-
- err = eachEventFn(event.Directory, event.EventNotification)
-
- processedTs = logEntry.TsNs
-
- if err != nil {
- newLastReadTime = time.Unix(0, processedTs)
- return
- }
-
- pos += 4 + int(size)
-
- }
-
- newLastReadTime = time.Unix(0, processedTs)
- return
-
-}
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index e3de57145..6dd423007 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -1,9 +1,12 @@
package weed_server
import (
+ "fmt"
"strings"
"time"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -23,48 +26,58 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, req.SinceNs)
}
- var readErr error
- for {
-
- lastReadTime, readErr = fs.filer.ReadLogBuffer(lastReadTime, func(dirPath string, eventNotification *filer_pb.EventNotification) error {
-
- // get complete path to the file or directory
- var entryName string
- if eventNotification.OldEntry != nil {
- entryName = eventNotification.OldEntry.Name
- } else if eventNotification.NewEntry != nil {
- entryName = eventNotification.NewEntry.Name
- }
-
- fullpath := util.Join(dirPath, entryName)
-
- // skip on filer internal meta logs
- if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
- return nil
- }
-
- if !strings.HasPrefix(fullpath, req.PathPrefix) {
- return nil
- }
-
- message := &filer_pb.SubscribeMetadataResponse{
- Directory: dirPath,
- EventNotification: eventNotification,
- }
- if err := stream.Send(message); err != nil {
- return err
- }
+ eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification) error {
+
+ // get complete path to the file or directory
+ var entryName string
+ if eventNotification.OldEntry != nil {
+ entryName = eventNotification.OldEntry.Name
+ } else if eventNotification.NewEntry != nil {
+ entryName = eventNotification.NewEntry.Name
+ }
+
+ fullpath := util.Join(dirPath, entryName)
+
+ // skip on filer internal meta logs
+ if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
return nil
- })
- if readErr != nil {
- glog.V(0).Infof("=> client %v: %+v", clientName, readErr)
- return readErr
}
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ return nil
+ }
+
+ message := &filer_pb.SubscribeMetadataResponse{
+ Directory: dirPath,
+ EventNotification: eventNotification,
+ }
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, err)
+ return err
+ }
+ return nil
+ }
+
+ _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
- }
+ return true
+ }, func(logEntry *filer_pb.LogEntry) error {
+ event := &filer_pb.SubscribeMetadataResponse{}
+ if err := proto.Unmarshal(logEntry.Data, event); err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ }
+
+ if err := eachEventNotificationFn(event.Directory, event.EventNotification); err != nil {
+ return err
+ }
+
+ return nil
+ })
+
+ return err
}