aboutsummaryrefslogtreecommitdiff
path: root/weed/command/watch.go
blob: 8aad2201925a4623322f725ef76b5392dd3a81ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package command

import (
	"context"
	"fmt"
	"io"
	"path/filepath"
	"strings"
	"time"

	"github.com/chrislusf/seaweedfs/weed/pb"
	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"github.com/chrislusf/seaweedfs/weed/security"
	"github.com/chrislusf/seaweedfs/weed/util"
)

func init() {
	cmdWatch.Run = runWatch // break init cycle
}

var cmdWatch = &Command{
	UsageLine: "watch [-filer=localhost:8888] [-target=/]",
	Short:     "see recent changes on a filer",
	Long: `See recent changes on a filer.

  `,
}

var (
	watchFiler   = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
	watchTarget  = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
	watchStart   = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
	watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
)

func runWatch(cmd *Command, args []string) bool {

	grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")

	var filterFunc func(dir, fname string) bool
	if *watchPattern != "" {
		if strings.Contains(*watchPattern, "/") {
			println("watch path pattern", *watchPattern)
			filterFunc = func(dir, fname string) bool {
				matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
				if err != nil {
					fmt.Printf("error: %v", err)
				}
				return matched
			}
		} else {
			println("watch file pattern", *watchPattern)
			filterFunc = func(dir, fname string) bool {
				matched, err := filepath.Match(*watchPattern, fname)
				if err != nil {
					fmt.Printf("error: %v", err)
				}
				return matched
			}
		}
	}

	shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
		if filterFunc == nil {
			return true
		}
		if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
			return false
		}
		if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
			return true
		}
		if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
			return true
		}
		return false
	}

	eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
		fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
		return nil
	}

	watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {

		ctx, cancel := context.WithCancel(context.Background())
		defer cancel()

		stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
			ClientName: "watch",
			PathPrefix: *watchTarget,
			SinceNs:    time.Now().Add(-*watchStart).UnixNano(),
		})
		if err != nil {
			return fmt.Errorf("listen: %v", err)
		}

		for {
			resp, listenErr := stream.Recv()
			if listenErr == io.EOF {
				return nil
			}
			if listenErr != nil {
				return listenErr
			}
			if !shouldPrint(resp) {
				continue
			}
			if err = eachEntryFunc(resp); err != nil {
				return err
			}
		}

	})
	if watchErr != nil {
		fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
	}

	return true
}