aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorqieqieplus <admin@qieqie.me>2021-07-05 16:01:16 +0800
committerqieqieplus <admin@qieqie.me>2021-07-05 16:01:16 +0800
commit233103f6b2d8104acffeb7a5b91600974430c762 (patch)
tree7bb8a398e693af4f32003c8204fe972dac5db040 /weed/server/filer_grpc_server_sub_meta.go
parentf5fa0b08fd2b10de88bcb4e7c16ca3e844e1831c (diff)
downloadseaweedfs-233103f6b2d8104acffeb7a5b91600974430c762.tar.xz
seaweedfs-233103f6b2d8104acffeb7a5b91600974430c762.zip
sync empty notification with timestamp
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go29
1 files changed, 24 insertions, 5 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 624069b7e..3fdac1b26 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,7 +2,6 @@ package weed_server
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"strings"
"time"
@@ -12,6 +11,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+const (
+ // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
+ MaxUnsyncedEvents = 1e3
)
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
@@ -25,7 +30,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
@@ -87,7 +92,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
@@ -152,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
}
}
-func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ filtered := 0
+
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ defer func() {
+ if filtered > MaxUnsyncedEvents {
+ if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
+ EventNotification: &filer_pb.EventNotification{},
+ TsNs: tsNs,
+ }); err == nil {
+ filtered = 0
+ }
+ }
+ }()
+ filtered++
foundSelf := false
for _, sig := range eventNotification.Signatures {
- if sig == clientSignature && clientSignature != 0 {
+ if sig == req.Signature && req.Signature != 0 {
return nil
}
if sig == fs.filer.Signature {
@@ -204,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err
}
+ filtered = 0
return nil
}
}