aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/download.go14
-rw-r--r--weed/command/export.go24
-rw-r--r--weed/command/filer.go31
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/filer_sync.go337
-rw-r--r--weed/command/fix.go2
-rw-r--r--weed/command/master.go2
-rw-r--r--weed/command/mount.go4
-rw-r--r--weed/command/mount_std.go34
-rw-r--r--weed/command/s3.go24
-rw-r--r--weed/command/scaffold.go18
-rw-r--r--weed/command/server.go15
-rw-r--r--weed/command/volume.go73
-rw-r--r--weed/command/watch.go60
15 files changed, 538 insertions, 103 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 9a41a8a7c..0df22b575 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -16,6 +16,7 @@ var Commands = []*Command{
cmdExport,
cmdFiler,
cmdFilerReplicate,
+ cmdFilerSynchronize,
cmdFix,
cmdMaster,
cmdMount,
diff --git a/weed/command/download.go b/weed/command/download.go
index 7d4dad2d4..f7588fbf0 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net/http"
"os"
"path"
"strings"
@@ -59,7 +60,7 @@ func downloadToFile(server, fileId, saveDir string) error {
if err != nil {
return err
}
- defer rc.Close()
+ defer util.CloseResponse(rc)
if filename == "" {
filename = fileId
}
@@ -71,12 +72,11 @@ func downloadToFile(server, fileId, saveDir string) error {
}
f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
- io.Copy(ioutil.Discard, rc)
return err
}
defer f.Close()
if isFileList {
- content, err := ioutil.ReadAll(rc)
+ content, err := ioutil.ReadAll(rc.Body)
if err != nil {
return err
}
@@ -95,7 +95,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
}
} else {
- if _, err = io.Copy(f, rc); err != nil {
+ if _, err = io.Copy(f, rc.Body); err != nil {
return err
}
@@ -108,12 +108,12 @@ func fetchContent(server string, fileId string) (filename string, content []byte
if lookupError != nil {
return "", nil, lookupError
}
- var rc io.ReadCloser
+ var rc *http.Response
if filename, _, rc, e = util.DownloadFile(fileUrl); e != nil {
return "", nil, e
}
- content, e = ioutil.ReadAll(rc)
- rc.Close()
+ defer util.CloseResponse(rc)
+ content, e = ioutil.ReadAll(rc.Body)
return
}
diff --git a/weed/command/export.go b/weed/command/export.go
index 411d231cb..78d75ef52 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -23,7 +23,7 @@ import (
)
const (
- defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}`
+ defaultFnFormat = `{{.Id}}_{{.Name}}{{.Ext}}`
timeFormat = "2006-01-02T15:04:05"
)
@@ -56,7 +56,7 @@ func init() {
var (
output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
- format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}")
+ format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Id}} {{.Name}} {{.Ext}}")
newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone, e.g. 2006-01-02T15:04:05")
showDeleted = cmdExport.Flag.Bool("deleted", false, "export deleted files. only applies if -o is not specified")
limit = cmdExport.Flag.Int("limit", 0, "only show first n entries if specified")
@@ -70,13 +70,13 @@ var (
localLocation, _ = time.LoadLocation("Local")
)
-func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool) {
+func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version, deleted bool, offset int64, onDiskSize int64) {
key := needle.NewFileIdFromNeedle(vid, n).String()
- size := n.DataSize
+ size := int32(n.DataSize)
if version == needle.Version1 {
- size = n.Size
+ size = int32(n.Size)
}
- fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\n",
+ fmt.Printf("%s\t%s\t%d\t%t\t%s\t%s\t%s\t%t\t%d\t%d\n",
key,
n.Name,
size,
@@ -85,6 +85,8 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version,
n.LastModifiedString(),
n.Ttl.String(),
deleted,
+ offset,
+ offset+onDiskSize,
)
}
@@ -111,7 +113,7 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
nv, ok := needleMap.Get(n.Id)
glog.V(3).Infof("key %d offset %d size %d disk_size %d compressed %v ok %v nv %+v",
n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed(), ok, nv)
- if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset {
+ if *showDeleted && n.Size > 0 || ok && nv.Size.IsValid() && nv.Offset.ToAcutalOffset() == offset {
if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) {
glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d",
n.LastModified, newerThanUnix)
@@ -124,17 +126,17 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset in
if tarOutputFile != nil {
return writeFile(vid, n)
} else {
- printNeedle(vid, n, scanner.version, false)
+ printNeedle(vid, n, scanner.version, false, offset, n.DiskSize(scanner.version))
return nil
}
}
if !ok {
if *showDeleted && tarOutputFile == nil {
if n.DataSize > 0 {
- printNeedle(vid, n, scanner.version, true)
+ printNeedle(vid, n, scanner.version, true, offset, n.DiskSize(scanner.version))
} else {
n.Name = []byte("*tombstone")
- printNeedle(vid, n, scanner.version, true)
+ printNeedle(vid, n, scanner.version, true, offset, n.DiskSize(scanner.version))
}
}
glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size)
@@ -208,7 +210,7 @@ func runExport(cmd *Command, args []string) bool {
}
if tarOutputFile == nil {
- fmt.Printf("key\tname\tsize\tgzip\tmime\tmodified\tttl\tdeleted\n")
+ fmt.Printf("key\tname\tsize\tgzip\tmime\tmodified\tttl\tdeleted\tstart\tstop\n")
}
err = storage.ScanVolumeFile(util.ResolvePath(*export.dir), *export.collection, vid, storage.NeedleMapInMemory, volumeFileScanner)
diff --git a/weed/command/filer.go b/weed/command/filer.go
index c36c43e93..e885eafc4 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"strconv"
"strings"
@@ -13,11 +14,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
- f FilerOptions
+ f FilerOptions
+ filerStartS3 *bool
+ filerS3Options S3Options
)
type FilerOptions struct {
@@ -36,6 +40,7 @@ type FilerOptions struct {
disableHttp *bool
cipher *bool
peers *string
+ metricsHttpPort *int
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
@@ -49,7 +54,7 @@ func init() {
f.bindIp = cmdFiler.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
- f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
+ f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
@@ -57,6 +62,15 @@ func init() {
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
+ f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+
+ // start s3 on filer
+ filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
+ filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port")
+ filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
+ filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
+ filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
}
var cmdFiler = &Command{
@@ -84,6 +98,17 @@ func runFiler(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
+ go stats_collect.StartMetricsServer(*f.metricsHttpPort)
+
+ if *filerStartS3 {
+ filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port)
+ filerS3Options.filer = &filerAddress
+ go func() {
+ time.Sleep(2 * time.Second)
+ filerS3Options.startS3Server()
+ }()
+ }
+
f.startFiler()
return true
@@ -152,7 +177,7 @@ func (fo *FilerOptions) startFiler() {
// starting grpc server
grpcPort := *fo.port + 10000
- grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0)
+ grpcL, err := util.NewListener(*fo.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 2d6ba94d6..88148acc5 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -72,7 +72,7 @@ var cmdCopy = &Command{
If "maxMB" is set to a positive number, files larger than it would be split into chunks.
- `,
+`,
}
func runCopy(cmd *Command, args []string) bool {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
new file mode 100644
index 000000000..af0a624b1
--- /dev/null
+++ b/weed/command/filer_sync.go
@@ -0,0 +1,337 @@
+package command
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication"
+ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+ "google.golang.org/grpc"
+ "io"
+ "strings"
+ "time"
+)
+
+type SyncOptions struct {
+ isActivePassive *bool
+ filerA *string
+ filerB *string
+ aPath *string
+ bPath *string
+ aReplication *string
+ bReplication *string
+ aCollection *string
+ bCollection *string
+ aTtlSec *int
+ bTtlSec *int
+ aDebug *bool
+ bDebug *bool
+}
+
+var (
+ syncOptions SyncOptions
+ syncCpuProfile *string
+ syncMemProfile *string
+)
+
+func init() {
+ cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
+ syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true")
+ syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
+ syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
+ syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
+ syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
+ syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
+ syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
+ syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
+ syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
+ syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
+ syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
+ syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
+ syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
+ syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
+ syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
+}
+
+var cmdFilerSynchronize = &Command{
+ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
+ Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters",
+ Long: `continuously synchronize file changes between two active-active or active-passive filers
+
+ filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the other destination. Different from filer.replicate:
+
+ * filer.sync only works between two filers.
+ * filer.sync does not need any special message queue setup.
+ * filer.sync supports both active-active and active-passive modes.
+
+ If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
+ A fresh sync will start from the earliest metadata logs.
+
+`,
+}
+
+func runFilerSynchronize(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
+
+ go func() {
+ for {
+ err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB,
+ *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug)
+ if err != nil {
+ glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+ }()
+
+ if !*syncOptions.isActivePassive {
+ go func() {
+ for {
+ err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA,
+ *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug)
+ if err != nil {
+ glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
+ time.Sleep(2147 * time.Millisecond)
+ }
+ }
+ }()
+ }
+
+ select {}
+
+ return true
+}
+
+func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string,
+ replicationStr, collection string, ttlSec int, debug bool) error {
+
+ // read source filer signature
+ sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
+ if sourceErr != nil {
+ return sourceErr
+ }
+ // read target filer signature
+ targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
+ if targetErr != nil {
+ return targetErr
+ }
+
+ // if first time, start from now
+ // if has previously synced, resume from that point of time
+ sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
+ if err != nil {
+ return err
+ }
+
+ glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
+
+ // create filer sink
+ filerSource := &source.FilerSource{}
+ filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath)
+ filerSink := &filersink.FilerSink{}
+ filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption)
+ filerSink.SetSourceFiler(filerSource)
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ var sourceOldKey, sourceNewKey util.FullPath
+ if message.OldEntry != nil {
+ sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
+ }
+ if message.NewEntry != nil {
+ sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
+ }
+
+ for _, sig := range message.Signatures {
+ if sig == targetFilerSignature && targetFilerSignature != 0 {
+ fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
+ return nil
+ }
+ }
+ if debug {
+ fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
+ }
+
+ if !strings.HasPrefix(resp.Directory, sourcePath) {
+ return nil
+ }
+
+ // handle deletions
+ if message.OldEntry != nil && message.NewEntry == nil {
+ if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ return nil
+ }
+ key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+
+ // handle new entries
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ return nil
+ }
+ key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
+ return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ }
+
+ // this is something special?
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+
+ // handle updates
+ if strings.HasPrefix(string(sourceOldKey), sourcePath) {
+ // old key is in the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is also in the watched directory
+ oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
+ foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
+ if foundExisting {
+ return err
+ }
+
+ // not able to find old entry
+ if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
+ return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ }
+
+ // create the new entry
+ newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
+ return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
+
+ } else {
+ // new key is outside of the watched directory
+ key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
+ return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ } else {
+ // old key is outside of the watched directory
+ if strings.HasPrefix(string(sourceNewKey), sourcePath) {
+ // new key is in the watched directory
+ key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
+ return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ } else {
+ // new key is also outside of the watched directory
+ // skip
+ }
+ }
+
+ return nil
+ }
+
+ return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "syncTo_" + targetFiler,
+ PathPrefix: sourcePath,
+ SinceNs: sourceFilerOffsetTsNs,
+ Signature: targetFilerSignature,
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return err
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
+ return err
+ }
+ }
+
+ }
+
+ })
+
+}
+
+const (
+ SyncKeyPrefix = "sync."
+)
+
+func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
+
+ readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ syncKey := []byte(SyncKeyPrefix + "____")
+ util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+
+ resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
+ if err != nil {
+ return err
+ }
+
+ if len(resp.Error) != 0 {
+ return errors.New(resp.Error)
+ }
+ if len(resp.Value) < 8 {
+ return nil
+ }
+
+ lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
+
+ return nil
+ })
+
+ return
+
+}
+
+func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
+ return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ syncKey := []byte(SyncKeyPrefix + "____")
+ util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
+
+ valueBuf := make([]byte, 8)
+ util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
+
+ resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
+ Key: syncKey,
+ Value: valueBuf,
+ })
+ if err != nil {
+ return err
+ }
+
+ if len(resp.Error) != 0 {
+ return errors.New(resp.Error)
+ }
+
+ return nil
+
+ })
+
+}
diff --git a/weed/command/fix.go b/weed/command/fix.go
index e1455790f..ae9a051b8 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -48,7 +48,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
- if n.Size > 0 && n.Size != types.TombstoneFileSize {
+ if n.Size.IsValid() {
pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
diff --git a/weed/command/master.go b/weed/command/master.go
index a6fe744d7..144962f63 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -57,7 +57,7 @@ func init() {
m.garbageThreshold = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
m.whiteList = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
m.disableHttp = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
- m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address")
+ m.metricsAddress = cmdMaster.Flag.String("metrics.address", "", "Prometheus gateway address <host>:<port>")
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index a0e573423..7bf59cdc7 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -20,6 +20,8 @@ type MountOptions struct {
umaskString *string
nonempty *bool
outsideContainerClusterMode *bool
+ uidMap *string
+ gidMap *string
}
var (
@@ -47,6 +49,8 @@ func init() {
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
+ mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
+ mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
}
var cmdMount = &Command{
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 3975575e9..7c0f56d3a 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -5,8 +5,8 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"os"
- "os/user"
"path"
"runtime"
"strconv"
@@ -86,33 +86,17 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
fuse.Unmount(dir)
- uid, gid := uint32(0), uint32(0)
-
// detect mount folder mode
if *option.dirAutoCreate {
- os.MkdirAll(dir, 0755)
+ os.MkdirAll(dir, os.FileMode(0777)&^umask)
}
- mountMode := os.ModeDir | 0755
fileInfo, err := os.Stat(dir)
- if err == nil {
- mountMode = os.ModeDir | fileInfo.Mode()
- uid, gid = util.GetFileUidGid(fileInfo)
- fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, fileInfo.Mode())
- } else {
- fmt.Printf("can not stat %s\n", dir)
- return false
- }
- if uid == 0 {
- if u, err := user.Current(); err == nil {
- if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
- uid = uint32(parsedId)
- }
- if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
- gid = uint32(parsedId)
- }
- fmt.Printf("current uid=%d gid=%d\n", uid, gid)
- }
+ // mapping uid, gid
+ uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
+ if err != nil {
+ fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
+ return false
}
// Ensure target mount point availability
@@ -166,14 +150,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
EntryCacheTtl: 3 * time.Second,
- MountUid: uid,
- MountGid: gid,
- MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
Umask: umask,
OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
Cipher: cipher,
+ UidGidMapper: uidGidMapper,
})
// mount
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 92f13673c..e94decaf3 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -22,12 +23,13 @@ var (
)
type S3Options struct {
- filer *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
}
func init() {
@@ -38,6 +40,7 @@ func init() {
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
+ s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
}
var cmdS3 = &Command{
@@ -112,6 +115,8 @@ func runS3(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
+ go stats_collect.StartMetricsServer(*s3StandaloneOptions.metricsHttpPort)
+
return s3StandaloneOptions.startS3Server()
}
@@ -128,6 +133,10 @@ func (s3opt *S3Options) startS3Server() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ // metrics read from the filer
+ var metricsAddress string
+ var metricsIntervalSec int
+
for {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
@@ -135,6 +144,7 @@ func (s3opt *S3Options) startS3Server() bool {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
filerBucketsPath = resp.DirBuckets
+ metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
return nil
})
@@ -147,6 +157,8 @@ func (s3opt *S3Options) startS3Server() bool {
}
}
+ go stats_collect.LoopPushingMetric("s3", stats_collect.SourceName(uint32(*s3opt.port)), metricsAddress, metricsIntervalSec)
+
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index b199f2d2d..c36e4a25f 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -140,6 +140,8 @@ keyspace="seaweedfs"
hosts=[
"localhost:9042",
]
+username=""
+password=""
[redis2]
enabled = false
@@ -173,6 +175,20 @@ enabled = false
uri = "mongodb://localhost:27017"
option_pool_size = 0
database = "seaweedfs"
+
+[elastic7]
+enabled = false
+servers = [
+ "http://localhost1:9200",
+ "http://localhost2:9200",
+ "http://localhost3:9200",
+]
+username = ""
+password = ""
+sniff_enabled = false
+healthcheck_enabled = false
+# increase the value is recommend, be sure the value in Elastic is greater or equal here
+index.max_result_window = 10000
`
NOTIFICATION_TOML_EXAMPLE = `
@@ -377,7 +393,7 @@ default = "localhost:8888" # used by maintenance scripts if the scripts needs
[master.sequencer]
-type = "memory" # Choose [memory|etcd] type for storing the file id sequence
+type = "raft" # Choose [raft|etcd] type for storing the file id sequence
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
# example : http://127.0.0.1:2379,http://127.0.0.1:2389
sequencer_etcd_urls = "http://127.0.0.1:2379"
diff --git a/weed/command/server.go b/weed/command/server.go
index 565563c77..7efc45475 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"os"
"runtime"
"runtime/pprof"
@@ -56,6 +57,7 @@ var (
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.")
volumeMinFreeSpacePercent = cmdServer.Flag.String("volume.minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly.")
+ serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
@@ -83,7 +85,7 @@ func init() {
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
- filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
+ filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
@@ -96,9 +98,10 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
- serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
- serverOptions.v.pprof = &False
+ serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
+ serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
@@ -135,6 +138,7 @@ func runServer(cmd *Command, args []string) bool {
peers := strings.Join(peerList, ",")
masterOptions.peers = &peers
+ // ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
filerOptions.masters = &peers
@@ -161,11 +165,8 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
- if *filerOptions.defaultReplicaPlacement == "" {
- *filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
- }
-
runtime.GOMAXPROCS(runtime.NumCPU())
+ go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
folders := strings.Split(*volumeDataFolders, ",")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 4f04a467d..dfc649ba5 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -25,6 +25,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -55,6 +56,8 @@ type VolumeServerOptions struct {
fileSizeLimitMB *int
minFreeSpacePercents []float32
pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
}
@@ -66,6 +69,7 @@ func init() {
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
+ v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
@@ -76,8 +80,9 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
- v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
+ v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
}
var cmdVolume = &Command{
@@ -107,6 +112,8 @@ func runVolume(cmd *Command, args []string) bool {
grace.SetupProfiling(*v.cpuProfile, *v.memProfile)
}
+ go stats_collect.StartMetricsServer(*v.metricsHttpPort)
+
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent)
return true
@@ -206,7 +213,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
)
-
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -222,47 +228,48 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
- stopChain := make(chan struct{})
+ stopChan := make(chan bool)
grace.OnInterrupt(func() {
fmt.Println("volume server has be killed")
- var startTime time.Time
-
- // firstly, stop the public http service to prevent from receiving new user request
- if nil != publicHttpDown {
- startTime = time.Now()
- if err := publicHttpDown.Stop(); err != nil {
- glog.Warningf("stop the public http server failed, %v", err)
- }
- delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("stop public http server, elapsed %dms", delta)
- }
- startTime = time.Now()
- if err := clusterHttpServer.Stop(); err != nil {
- glog.Warningf("stop the cluster http server failed, %v", err)
+ // Stop heartbeats
+ if !volumeServer.StopHeartbeat() {
+ glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds)
+ time.Sleep(time.Duration(*v.preStopSeconds) * time.Second)
}
- delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
- startTime = time.Now()
- grpcS.GracefulStop()
- delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+ shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer)
+ stopChan <- true
+ })
- startTime = time.Now()
- volumeServer.Shutdown()
- delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
- glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+ select {
+ case <-stopChan:
+ }
- pprof.StopCPUProfile()
+}
- close(stopChain) // notify exit
- })
+func shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, grpcS *grpc.Server, volumeServer *weed_server.VolumeServer) {
- select {
- case <-stopChain:
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ glog.V(0).Infof("stop public http server ... ")
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ }
+
+ glog.V(0).Infof("graceful stop cluster http server ... ")
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
}
- glog.Warningf("the volume server exit.")
+
+ glog.V(0).Infof("graceful stop gRPC ...")
+ grpcS.GracefulStop()
+
+ volumeServer.Shutdown()
+
+ pprof.StopCPUProfile()
+
}
// check whether configure the public port
diff --git a/weed/command/watch.go b/weed/command/watch.go
index b46707a62..fd7dd6fb2 100644
--- a/weed/command/watch.go
+++ b/weed/command/watch.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"io"
+ "path/filepath"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -17,7 +19,7 @@ func init() {
}
var cmdWatch = &Command{
- UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]",
+ UsageLine: "watch [-filer=localhost:8888] [-target=/]",
Short: "see recent changes on a filer",
Long: `See recent changes on a filer.
@@ -25,18 +27,61 @@ var cmdWatch = &Command{
}
var (
- watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
- watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
- watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
)
func runWatch(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var filterFunc func(dir, fname string) bool
+ if *watchPattern != "" {
+ if strings.Contains(*watchPattern, "/") {
+ println("watch path pattern", *watchPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ } else {
+ println("watch file pattern", *watchPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*watchPattern, fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ }
+ }
+
+ shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
+ if filterFunc == nil {
+ return true
+ }
+ if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
+ return false
+ }
+ if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
+ return true
+ }
+ if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
+ return true
+ }
+ return false
+ }
+
watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "watch",
PathPrefix: *watchTarget,
SinceNs: time.Now().Add(-*watchStart).UnixNano(),
@@ -53,7 +98,10 @@ func runWatch(cmd *Command, args []string) bool {
if listenErr != nil {
return listenErr
}
- fmt.Printf("events: %+v\n", resp.EventNotification)
+ if !shouldPrint(resp) {
+ continue
+ }
+ fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
}
})