aboutsummaryrefslogtreecommitdiff
path: root/unmaintained/load_test/load_test_meta_tail
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-04 16:25:46 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-04 16:25:46 -0700
commit6b743dbbf96f863e70ee80e4b32c0928f594891a (patch)
tree0e3551b5c411ad2131a1b5467318fd973bee71b7 /unmaintained/load_test/load_test_meta_tail
parentb9ecf1e3a8685c62ccac80ed0fbc180ed34b48e2 (diff)
downloadseaweedfs-6b743dbbf96f863e70ee80e4b32c0928f594891a.tar.xz
seaweedfs-6b743dbbf96f863e70ee80e4b32c0928f594891a.zip
refactor client subscribe metadata
Diffstat (limited to 'unmaintained/load_test/load_test_meta_tail')
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go35
1 files changed, 2 insertions, 33 deletions
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
index 53cb2f912..c521ce33e 100644
--- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -1,14 +1,12 @@
package main
import (
- "context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
- "io"
"strconv"
"time"
)
@@ -74,38 +72,9 @@ func startGenerateMetadata() {
func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
- lastTsNs := int64(0)
+ tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail",
+ *dir, 0, 0, eachEntryFunc, false)
- tailErr := pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *dir,
- SinceNs: lastTsNs,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- glog.V(0).Infof("tail last record:%+v", time.Unix(0, lastTsNs))
- return err
- }
- lastTsNs = resp.TsNs
- }
-
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}