diff options
Diffstat (limited to 'weed/pb/filer_pb_tail.go')
| -rw-r--r-- | weed/pb/filer_pb_tail.go | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go new file mode 100644 index 000000000..31fb62fb3 --- /dev/null +++ b/weed/pb/filer_pb_tail.go @@ -0,0 +1,94 @@ +package pb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc" + "io" + "time" +) + +type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error + +func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, + clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, + processEventFn ProcessMetadataFunc, fatalOnError bool) error { + + err := WithFilerClient(filerAddress, grpcDialOption, makeFunc( + clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) + if err != nil { + return fmt.Errorf("subscribing filer meta change: %v", err) + } + return err +} + +func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient, + clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, + processEventFn ProcessMetadataFunc, fatalOnError bool) error { + + err := filerClient.WithFilerClient(makeFunc( + clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError)) + if err != nil { + return fmt.Errorf("subscribing filer meta change: %v", err) + } + + return nil +} + +func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32, + processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error { + return func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: clientName, + PathPrefix: pathPrefix, + SinceNs: lastTsNs, + Signature: selfSignature, + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + if fatalOnError { + glog.Fatalf("process %v: %v", resp, err) + } else { + glog.Errorf("process %v: %v", resp, err) + } + } + lastTsNs = resp.TsNs + } + } +} + +func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc { + var counter int64 + var lastWriteTime time.Time + return func(resp *filer_pb.SubscribeMetadataResponse) error { + if err := processEventFn(resp); err != nil { + return err + } + counter++ + if lastWriteTime.Add(offsetInterval).Before(time.Now()) { + counter = 0 + lastWriteTime = time.Now() + if err := offsetFunc(counter, resp.TsNs); err != nil { + return err + } + } + return nil + } + +} |
