aboutsummaryrefslogtreecommitdiff
path: root/unmaintained/load_test/load_test_meta_tail
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-05-22 18:03:13 -0700
committerChris Lu <chris.lu@gmail.com>2021-05-22 18:03:13 -0700
commita852f8a3f569332d3b4ae0abf6668fcf9af33bb5 (patch)
tree8b59b0ae2493b1d3940a8dbebe8ced0f93de20ac /unmaintained/load_test/load_test_meta_tail
parent3d3fa43542dabab77a2c1ade868fcd8dc0e68cd9 (diff)
downloadseaweedfs-a852f8a3f569332d3b4ae0abf6668fcf9af33bb5.tar.xz
seaweedfs-a852f8a3f569332d3b4ae0abf6668fcf9af33bb5.zip
some code to stress test the meta data events
Diffstat (limited to 'unmaintained/load_test/load_test_meta_tail')
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go108
1 files changed, 108 insertions, 0 deletions
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
new file mode 100644
index 000000000..98db80a05
--- /dev/null
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -0,0 +1,108 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/grpc"
+ "io"
+ "strconv"
+)
+
+var (
+ dir = flag.String("dir", "/tmp", "directory to create files")
+ n = flag.Int("n", 100, "the number of metadata")
+ tailFiler = flag.String("filer", "localhost:8888", "the filer address")
+ isWrite = flag.Bool("write", false, "only write")
+)
+
+func main() {
+
+ flag.Parse()
+
+ if *isWrite {
+ startGenerateMetadata()
+ return
+ }
+
+ expected := 0
+ startSubscribeMetadata(func(event *filer_pb.SubscribeMetadataResponse) error {
+ if event.Directory != *dir {
+ return nil
+ }
+ name := event.EventNotification.NewEntry.Name
+ fmt.Printf("=> %s\n", name)
+ id := name[4:]
+ if x, err := strconv.Atoi(id); err == nil {
+ if x != expected {
+ return fmt.Errorf("Expected file%d Actual %s\n", expected, name)
+ }
+ expected++
+ } else {
+ return err
+ }
+ return nil
+ })
+
+}
+
+func startGenerateMetadata() {
+ pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
+
+ for i := 0; i < *n; i++ {
+ name := fmt.Sprintf("file%d", i)
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: *dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ },
+ }); err != nil {
+ fmt.Printf("create entry %s: %v\n", name, err)
+ return err
+ }
+ }
+
+ return nil
+
+ })
+}
+
+func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
+
+ lastTsNs := int64(0)
+
+ tailErr := pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "tail",
+ PathPrefix: *dir,
+ SinceNs: lastTsNs,
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ if err = eachEntryFunc(resp); err != nil {
+ return err
+ }
+ lastTsNs = resp.TsNs
+ }
+
+ })
+ if tailErr != nil {
+ fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
+ }
+}