diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-01-21 00:47:27 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-01-21 00:47:27 -0800 |
| commit | a9e6db1a8e964f10571a5e11c197fd2da141c6f7 (patch) | |
| tree | 0a22db3d6c68178d8e913c1bffd550ae4127a7e1 /weed/command/filer_meta_tail.go | |
| parent | f0455dee683e831487c345a575b7894c0e2bf61a (diff) | |
| parent | 84f05787f8eecfcb61e49882346ad5855b6bb784 (diff) | |
| download | seaweedfs-origin/ftp.tar.xz seaweedfs-origin/ftp.zip | |
Merge branch 'master' into ftporigin/ftp
Diffstat (limited to 'weed/command/filer_meta_tail.go')
| -rw-r--r-- | weed/command/filer_meta_tail.go | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go new file mode 100644 index 000000000..fa0262160 --- /dev/null +++ b/weed/command/filer_meta_tail.go @@ -0,0 +1,201 @@ +package command + +import ( + "context" + "fmt" + jsoniter "github.com/json-iterator/go" + "github.com/olivere/elastic/v7" + "io" + "path/filepath" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle +} + +var cmdFilerMetaTail = &Command{ + UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]", + Short: "see recent changes on a filer", + Long: `See recent changes on a filer. + + `, +} + +var ( + tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port") + tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") + esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>") + esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name") +) + +func runFilerMetaTail(cmd *Command, args []string) bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + var filterFunc func(dir, fname string) bool + if *tailPattern != "" { + if strings.Contains(*tailPattern, "/") { + println("watch path pattern", *tailPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*tailPattern, dir+"/"+fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } else { + println("watch file pattern", *tailPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*tailPattern, fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } + } + + shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { + if filterFunc == nil { + return true + } + if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { + return false + } + if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) { + return true + } + if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) { + return true + } + return false + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification) + return nil + } + if *esServers != "" { + var err error + eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex) + if err != nil { + fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err) + return false + } + } + + tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "tail", + PathPrefix: *tailTarget, + SinceNs: time.Now().Add(-*tailStart).UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + if !shouldPrint(resp) { + continue + } + if err = eachEntryFunc(resp); err != nil { + return err + } + } + + }) + if tailErr != nil { + fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) + } + + return true +} + +type EsDocument struct { + Dir string `json:"dir,omitempty"` + Name string `json:"name,omitempty"` + IsDirectory bool `json:"isDir,omitempty"` + Size uint64 `json:"size,omitempty"` + Uid uint32 `json:"uid,omitempty"` + Gid uint32 `json:"gid,omitempty"` + UserName string `json:"userName,omitempty"` + Collection string `json:"collection,omitempty"` + Crtime int64 `json:"crtime,omitempty"` + Mtime int64 `json:"mtime,omitempty"` + Mime string `json:"mime,omitempty"` +} + +func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) { + entry := event.NewEntry + dir, name := event.NewParentPath, entry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + esEntry := &EsDocument{ + Dir: dir, + Name: name, + IsDirectory: entry.IsDirectory, + Size: entry.Attributes.FileSize, + Uid: entry.Attributes.Uid, + Gid: entry.Attributes.Gid, + UserName: entry.Attributes.UserName, + Collection: entry.Attributes.Collection, + Crtime: entry.Attributes.Crtime, + Mtime: entry.Attributes.Mtime, + Mime: entry.Attributes.Mime, + } + return esEntry, id +} + +func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { + options := []elastic.ClientOptionFunc{} + options = append(options, elastic.SetURL(strings.Split(servers, ",")...)) + options = append(options, elastic.SetSniff(false)) + options = append(options, elastic.SetHealthcheck(false)) + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return func(resp *filer_pb.SubscribeMetadataResponse) error { + event := resp.EventNotification + if event.OldEntry != nil && + (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) { + // delete or not update the same file + dir, name := resp.Directory, event.OldEntry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + println("delete", id) + _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background()) + return err + } + if event.NewEntry != nil { + // add a new file or update the same file + esEntry, id := toEsEntry(event) + value, err := jsoniter.Marshal(esEntry) + if err != nil { + return err + } + println(string(value)) + _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background()) + return err + } + return nil + }, nil +} |
