diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/server.go | 1 | ||||
| -rw-r--r-- | weed/command/volume.go | 8 | ||||
| -rw-r--r-- | weed/command/watch.go | 55 |
3 files changed, 58 insertions, 6 deletions
diff --git a/weed/command/server.go b/weed/command/server.go index d16095075..4aeecbff0 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -98,6 +98,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = &False s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") diff --git a/weed/command/volume.go b/weed/command/volume.go index 6fb7447e7..c8f24802c 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -55,6 +55,7 @@ type VolumeServerOptions struct { fileSizeLimitMB *int minFreeSpacePercents []float32 pprof *bool + preStopSeconds *int // pulseSeconds *int } @@ -66,6 +67,7 @@ func init() { v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") @@ -206,7 +208,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, *v.fileSizeLimitMB, ) - // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -227,6 +228,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v fmt.Println("volume server has be killed") var startTime time.Time + // Stop heartbeats + glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) + volumeServer.SendHeartbeat = false + time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) + glog.V(0).Infof("end sleep %d sec", *v.preStopSeconds) // firstly, stop the public http service to prevent from receiving new user request if nil != publicHttpDown { startTime = time.Now() diff --git a/weed/command/watch.go b/weed/command/watch.go index b46707a62..9340db141 100644 --- a/weed/command/watch.go +++ b/weed/command/watch.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "io" + "path/filepath" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/pb" @@ -17,7 +19,7 @@ func init() { } var cmdWatch = &Command{ - UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]", + UsageLine: "watch [-filer=localhost:8888] [-target=/]", Short: "see recent changes on a filer", Long: `See recent changes on a filer. @@ -25,15 +27,55 @@ var cmdWatch = &Command{ } var ( - watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port") - watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") - watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port") + watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") ) func runWatch(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + var filterFunc func(dir, fname string) bool + if *watchPattern != "" { + if strings.Contains(*watchPattern, "/") { + println("watch path pattern", *watchPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*watchPattern, dir+"/"+fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } else { + println("watch file pattern", *watchPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*watchPattern, fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } + } + + shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { + if filterFunc == nil { + return true + } + if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { + return false + } + if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) { + return true + } + if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) { + return true + } + return false + } + watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ @@ -53,7 +95,10 @@ func runWatch(cmd *Command, args []string) bool { if listenErr != nil { return listenErr } - fmt.Printf("events: %+v\n", resp.EventNotification) + if !shouldPrint(resp) { + continue + } + fmt.Printf("%+v\n", resp.EventNotification) } }) |
