aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_meta_tail.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_meta_tail.go')
-rw-r--r--weed/command/filer_meta_tail.go36
1 files changed, 8 insertions, 28 deletions
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 76699bb5e..28c0db99b 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -3,16 +3,15 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
- "io"
"os"
"path/filepath"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -104,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *tailTarget,
- SinceNs: time.Now().Add(-*tailStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
+ *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
+ func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
- continue
+ return nil
}
- if err = eachEntryFunc(resp); err != nil {
+ if err := eachEntryFunc(resp); err != nil {
return err
}
- }
+ return nil
+ }, false)
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}