aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/filer_pb_tail.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/filer_pb_tail.go')
-rw-r--r--weed/pb/filer_pb_tail.go94
1 files changed, 94 insertions, 0 deletions
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go
new file mode 100644
index 000000000..31fb62fb3
--- /dev/null
+++ b/weed/pb/filer_pb_tail.go
@@ -0,0 +1,94 @@
+package pb
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
+
+func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption,
+ clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+ processEventFn ProcessMetadataFunc, fatalOnError bool) error {
+
+ err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(
+ clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
+ if err != nil {
+ return fmt.Errorf("subscribing filer meta change: %v", err)
+ }
+ return err
+}
+
+func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
+ clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+ processEventFn ProcessMetadataFunc, fatalOnError bool) error {
+
+ err := filerClient.WithFilerClient(makeFunc(
+ clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
+ if err != nil {
+ return fmt.Errorf("subscribing filer meta change: %v", err)
+ }
+
+ return nil
+}
+
+func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+ processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
+ return func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: clientName,
+ PathPrefix: pathPrefix,
+ SinceNs: lastTsNs,
+ Signature: selfSignature,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ if fatalOnError {
+ glog.Fatalf("process %v: %v", resp, err)
+ } else {
+ glog.Errorf("process %v: %v", resp, err)
+ }
+ }
+ lastTsNs = resp.TsNs
+ }
+ }
+}
+
+func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
+ var counter int64
+ var lastWriteTime time.Time
+ return func(resp *filer_pb.SubscribeMetadataResponse) error {
+ if err := processEventFn(resp); err != nil {
+ return err
+ }
+ counter++
+ if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := offsetFunc(counter, resp.TsNs); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+}