aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-08-31 18:02:11 +0800
committerGitHub <noreply@github.com>2020-08-31 18:02:11 +0800
commit44a56b158e4637bd70d3fcf8ddc9107973b60558 (patch)
tree4cf59d290d346c6ea06d617531c90d2653f3bc03 /weed/command
parentb0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (diff)
parent408e339c53b9b6626e81f1c3f0f2399494bf4ce6 (diff)
downloadseaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.tar.xz
seaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.zip
Merge pull request #13 from chrislusf/master
sync
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go8
-rw-r--r--weed/command/watch.go55
3 files changed, 58 insertions, 6 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index d16095075..4aeecbff0 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -98,6 +98,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
+ serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = &False
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 6fb7447e7..c8f24802c 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -55,6 +55,7 @@ type VolumeServerOptions struct {
fileSizeLimitMB *int
minFreeSpacePercents []float32
pprof *bool
+ preStopSeconds *int
// pulseSeconds *int
}
@@ -66,6 +67,7 @@ func init() {
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
+ v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
@@ -206,7 +208,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
)
-
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -227,6 +228,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
fmt.Println("volume server has be killed")
var startTime time.Time
+ // Stop heartbeats
+ glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
+ volumeServer.SendHeartbeat = false
+ time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
+ glog.V(0).Infof("end sleep %d sec", *v.preStopSeconds)
// firstly, stop the public http service to prevent from receiving new user request
if nil != publicHttpDown {
startTime = time.Now()
diff --git a/weed/command/watch.go b/weed/command/watch.go
index b46707a62..9340db141 100644
--- a/weed/command/watch.go
+++ b/weed/command/watch.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"io"
+ "path/filepath"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -17,7 +19,7 @@ func init() {
}
var cmdWatch = &Command{
- UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]",
+ UsageLine: "watch [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
@@ -25,15 +27,55 @@ var cmdWatch = &Command{
}
var (
- watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
- watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
- watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
)
func runWatch(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var filterFunc func(dir, fname string) bool
+ if *watchPattern != "" {
+ if strings.Contains(*watchPattern, "/") {
+ println("watch path pattern", *watchPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ } else {
+ println("watch file pattern", *watchPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*watchPattern, fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ }
+ }
+
+ shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
+ if filterFunc == nil {
+ return true
+ }
+ if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
+ return false
+ }
+ if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
+ return true
+ }
+ if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
+ return true
+ }
+ return false
+ }
+
watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
@@ -53,7 +95,10 @@ func runWatch(cmd *Command, args []string) bool {
if listenErr != nil {
return listenErr
}
- fmt.Printf("events: %+v\n", resp.EventNotification)
+ if !shouldPrint(resp) {
+ continue
+ }
+ fmt.Printf("%+v\n", resp.EventNotification)
}
})