diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/command.go | 3 | ||||
| -rw-r--r-- | weed/command/filer.go | 7 | ||||
| -rw-r--r-- | weed/command/filer_cat.go | 118 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 201 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 3 | ||||
| -rw-r--r-- | weed/command/mount.go | 2 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 5 | ||||
| -rw-r--r-- | weed/command/s3.go | 17 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 84 | ||||
| -rw-r--r-- | weed/command/server.go | 10 | ||||
| -rw-r--r-- | weed/command/shell.go | 25 | ||||
| -rw-r--r-- | weed/command/upload.go | 25 | ||||
| -rw-r--r-- | weed/command/watch.go | 113 |
14 files changed, 475 insertions, 140 deletions
diff --git a/weed/command/command.go b/weed/command/command.go index 0df22b575..3fa52c922 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -15,6 +15,8 @@ var Commands = []*Command{ cmdDownload, cmdExport, cmdFiler, + cmdFilerCat, + cmdFilerMetaTail, cmdFilerReplicate, cmdFilerSynchronize, cmdFix, @@ -25,7 +27,6 @@ var Commands = []*Command{ cmdScaffold, cmdServer, cmdShell, - cmdWatch, cmdUpload, cmdVersion, cmdVolume, diff --git a/weed/command/filer.go b/weed/command/filer.go index a3008eb29..633c25cac 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -42,7 +42,7 @@ type FilerOptions struct { cipher *bool peers *string metricsHttpPort *int - cacheToFilerLimit *int + saveToFilerLimit *int defaultLevelDbDirectory *string } @@ -64,7 +64,7 @@ func init() { f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers") f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") - f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") + f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") // start s3 on filer @@ -74,6 +74,7 @@ func init() { filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file") filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file") filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file") + filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") } var cmdFiler = &Command{ @@ -148,7 +149,7 @@ func (fo *FilerOptions) startFiler() { Host: *fo.ip, Port: uint32(*fo.port), Cipher: *fo.cipher, - CacheToFilerLimit: int64(*fo.cacheToFilerLimit), + SaveToFilerLimit: *fo.saveToFilerLimit, Filers: peers, }) if nfs_err != nil { diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go new file mode 100644 index 000000000..a46098b04 --- /dev/null +++ b/weed/command/filer_cat.go @@ -0,0 +1,118 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "google.golang.org/grpc" + "math" + "net/url" + "os" + "strings" + + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + filerCat FilerCatOptions +) + +type FilerCatOptions struct { + grpcDialOption grpc.DialOption + filerAddress string + filerClient filer_pb.SeaweedFilerClient + output *string +} + +func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return func(fileId string) (targetUrls []string, err error) { + vid := filer.VolumeId(fileId) + resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return nil, err + } + locations := resp.LocationsMap[vid] + for _, loc := range locations.Locations { + targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId)) + } + return + } +} + +func init() { + cmdFilerCat.Run = runFilerCat // break init cycle + filerCat.output = cmdFilerCat.Flag.String("o", "", "write to file instead of stdout") +} + +var cmdFilerCat = &Command{ + UsageLine: "filer.cat [-o <file>] http://localhost:8888/path/to/file", + Short: "copy one file to local", + Long: `read one file to stdout or write to a file + +`, +} + +func runFilerCat(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + if len(args) == 0 { + return false + } + filerSource := args[len(args)-1] + + filerUrl, err := url.Parse(filerSource) + if err != nil { + fmt.Printf("The last argument should be a URL on filer: %v\n", err) + return false + } + urlPath := filerUrl.Path + if strings.HasSuffix(urlPath, "/") { + fmt.Printf("The last argument should be a file: %v\n", err) + return false + } + + filerCat.filerAddress = filerUrl.Host + filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + dir, name := util.FullPath(urlPath).DirAndName() + + writer := os.Stdout + if *filerCat.output != "" { + + fmt.Printf("saving %s to %s\n", filerSource, *filerCat.output) + + f, err := os.OpenFile(*filerCat.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + fmt.Printf("open file %s: %v\n", *filerCat.output, err) + return false + } + defer f.Close() + writer = f + } + + pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Name: name, + Directory: dir, + } + respLookupEntry, err := filer_pb.LookupEntry(client, request) + if err != nil { + return err + } + + filerCat.filerClient = client + + return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + + }) + + return true +} diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 93248f357..b95df696c 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -92,7 +92,7 @@ func runCopy(cmd *Command, args []string) bool { } urlPath := filerUrl.Path if !strings.HasSuffix(urlPath, "/") { - fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err) + fmt.Printf("The last argument should be a folder and end with \"/\"\n") return false } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go new file mode 100644 index 000000000..fa0262160 --- /dev/null +++ b/weed/command/filer_meta_tail.go @@ -0,0 +1,201 @@ +package command + +import ( + "context" + "fmt" + jsoniter "github.com/json-iterator/go" + "github.com/olivere/elastic/v7" + "io" + "path/filepath" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle +} + +var cmdFilerMetaTail = &Command{ + UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]", + Short: "see recent changes on a filer", + Long: `See recent changes on a filer. + + `, +} + +var ( + tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port") + tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") + esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>") + esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name") +) + +func runFilerMetaTail(cmd *Command, args []string) bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + var filterFunc func(dir, fname string) bool + if *tailPattern != "" { + if strings.Contains(*tailPattern, "/") { + println("watch path pattern", *tailPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*tailPattern, dir+"/"+fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } else { + println("watch file pattern", *tailPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*tailPattern, fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } + } + + shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { + if filterFunc == nil { + return true + } + if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { + return false + } + if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) { + return true + } + if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) { + return true + } + return false + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification) + return nil + } + if *esServers != "" { + var err error + eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex) + if err != nil { + fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err) + return false + } + } + + tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ + ClientName: "tail", + PathPrefix: *tailTarget, + SinceNs: time.Now().Add(-*tailStart).UnixNano(), + }) + if err != nil { + return fmt.Errorf("listen: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + if !shouldPrint(resp) { + continue + } + if err = eachEntryFunc(resp); err != nil { + return err + } + } + + }) + if tailErr != nil { + fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) + } + + return true +} + +type EsDocument struct { + Dir string `json:"dir,omitempty"` + Name string `json:"name,omitempty"` + IsDirectory bool `json:"isDir,omitempty"` + Size uint64 `json:"size,omitempty"` + Uid uint32 `json:"uid,omitempty"` + Gid uint32 `json:"gid,omitempty"` + UserName string `json:"userName,omitempty"` + Collection string `json:"collection,omitempty"` + Crtime int64 `json:"crtime,omitempty"` + Mtime int64 `json:"mtime,omitempty"` + Mime string `json:"mime,omitempty"` +} + +func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) { + entry := event.NewEntry + dir, name := event.NewParentPath, entry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + esEntry := &EsDocument{ + Dir: dir, + Name: name, + IsDirectory: entry.IsDirectory, + Size: entry.Attributes.FileSize, + Uid: entry.Attributes.Uid, + Gid: entry.Attributes.Gid, + UserName: entry.Attributes.UserName, + Collection: entry.Attributes.Collection, + Crtime: entry.Attributes.Crtime, + Mtime: entry.Attributes.Mtime, + Mime: entry.Attributes.Mime, + } + return esEntry, id +} + +func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { + options := []elastic.ClientOptionFunc{} + options = append(options, elastic.SetURL(strings.Split(servers, ",")...)) + options = append(options, elastic.SetSniff(false)) + options = append(options, elastic.SetHealthcheck(false)) + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return func(resp *filer_pb.SubscribeMetadataResponse) error { + event := resp.EventNotification + if event.OldEntry != nil && + (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) { + // delete or not update the same file + dir, name := resp.Directory, event.OldEntry.Name + id := util.Md5String([]byte(util.NewFullPath(dir, name))) + println("delete", id) + _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background()) + return err + } + if event.NewEntry != nil { + // add a new file or update the same file + esEntry, id := toEsEntry(event) + value, err := jsoniter.Marshal(esEntry) + if err != nil { + return err + } + println(string(value)) + _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background()) + return err + } + return nil + }, nil +} diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 40f2b570b..4f698e375 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -14,7 +14,6 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" "github.com/chrislusf/seaweedfs/weed/replication/sub" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/spf13/viper" ) func init() { @@ -123,7 +122,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } -func validateOneEnabledInput(config *viper.Viper) { +func validateOneEnabledInput(config *util.ViperProxy) { enabledInput := "" for _, input := range sub.NotificationInputs { if config.GetBool("notification." + input.GetName() + ".enabled") { diff --git a/weed/command/mount.go b/weed/command/mount.go index f325cb0a5..fa75919aa 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -43,7 +43,7 @@ func init() { mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") - mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0") + mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0") mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 83cb352ff..9e955e344 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -58,6 +58,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { return true } + util.LoadConfiguration("security", false) // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool @@ -78,8 +79,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { dir := util.ResolvePath(*option.dir) chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB - util.LoadConfiguration("security", false) - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if dir == "" { fmt.Printf("Please specify the mount directory via \"-dir\"") @@ -151,6 +150,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { fuse.MaxReadahead(1024 * 128), fuse.AsyncRead(), fuse.WritebackCache(), + fuse.MaxBackground(128), + fuse.CongestionThreshold(128), } options = append(options, osSpecificMountOptions()...) diff --git a/weed/command/s3.go b/weed/command/s3.go index ed5bb0b80..d8e3e306b 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -23,13 +23,14 @@ var ( ) type S3Options struct { - filer *string - port *int - config *string - domainName *string - tlsPrivateKey *string - tlsCertificate *string - metricsHttpPort *int + filer *string + port *int + config *string + domainName *string + tlsPrivateKey *string + tlsCertificate *string + metricsHttpPort *int + allowEmptyFolder *bool } func init() { @@ -41,6 +42,7 @@ func init() { s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders") } var cmdS3 = &Command{ @@ -181,6 +183,7 @@ func (s3opt *S3Options) startS3Server() bool { DomainName: *s3opt.domainName, BucketsPath: filerBucketsPath, GrpcDialOption: grpcDialOption, + AllowEmptyFolder: *s3opt.allowEmptyFolder, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 6cfd46427..8b74274e5 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -44,6 +44,8 @@ func runScaffold(cmd *Command, args []string) bool { content = SECURITY_TOML_EXAMPLE case "master": content = MASTER_TOML_EXAMPLE + case "shell": + content = SHELL_TOML_EXAMPLE } if content == "" { println("need a valid -config option") @@ -85,9 +87,21 @@ buckets_folder = "/buckets" # local on disk, mostly for simple single-machine setup, fairly scalable # faster than previous leveldb, recommended. enabled = true -dir = "." # directory to store level db files +dir = "./filerldb2" # directory to store level db files -[mysql] # or tidb +[leveldb3] +# similar to leveldb2. +# each bucket has its own meta store. +enabled = false +dir = "./filerldb3" # directory to store level db files + +[rocksdb] +# local on disk, similar to leveldb +# since it is using a C wrapper, you need to install rocksdb and build it by yourself +enabled = false +dir = "./filerrdb" # directory to store rocksdb files + +[mysql] # or memsql, tidb # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', # name VARCHAR(1000) COMMENT 'directory or file name', @@ -104,9 +118,31 @@ password = "" database = "" # create or use an existing database connection_max_idle = 2 connection_max_open = 100 +connection_max_lifetime_seconds = 0 +interpolateParams = false + +[mysql2] # or memsql, tidb +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS %s ( + dirhash BIGINT, + name VARCHAR(1000), + directory TEXT, + meta LONGBLOB, + PRIMARY KEY (dirhash, name) + ) DEFAULT CHARSET=utf8; +""" +hostname = "localhost" +port = 3306 +username = "root" +password = "" +database = "" # create or use an existing database +connection_max_idle = 2 +connection_max_open = 100 +connection_max_lifetime_seconds = 0 interpolateParams = false -[postgres] # or cockroachdb +[postgres] # or cockroachdb, YugabyteDB # CREATE TABLE IF NOT EXISTS filemeta ( # dirhash BIGINT, # name VARCHAR(65535), @@ -119,7 +155,29 @@ hostname = "localhost" port = 5432 username = "postgres" password = "" -database = "" # create or use an existing database +database = "postgres" # create or use an existing database +schema = "" +sslmode = "disable" +connection_max_idle = 100 +connection_max_open = 100 + +[postgres2] +enabled = false +createTable = """ + CREATE TABLE IF NOT EXISTS %s ( + dirhash BIGINT, + name VARCHAR(65535), + directory VARCHAR(65535), + meta bytea, + PRIMARY KEY (dirhash, name) + ); +""" +hostname = "localhost" +port = 5432 +username = "postgres" +password = "" +database = "postgres" # create or use an existing database +schema = "" sslmode = "disable" connection_max_idle = 100 connection_max_open = 100 @@ -166,9 +224,9 @@ addresses = [ ] password = "" # allows reads from slave servers or the master, but all writes still go to the master -readOnly = true +readOnly = false # automatically use the closest Redis server for reads -routeByLatency = true +routeByLatency = false # This changes the data layout. Only add new directories. Removing/Updating will cause data loss. superLargeDirectories = [] @@ -460,4 +518,18 @@ copy_other = 1 # create n x 1 = n actual volumes treat_replication_as_minimums = false ` + SHELL_TOML_EXAMPLE = ` + +[cluster] +default = "c1" + +[cluster.c1] +master = "localhost:9333" # comma-separated master servers +filer = "localhost:8888" # filer host and port + +[cluster.c2] +master = "" +filer = "" + +` ) diff --git a/weed/command/server.go b/weed/command/server.go index 7e63f8e8a..9976db2ea 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -61,6 +61,7 @@ var ( serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + isStartingMasterServer = cmdServer.Flag.Bool("master", true, "whether to start master server") isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -94,7 +95,7 @@ func init() { filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size") filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list") - filerOptions.cacheToFilerLimit = cmdServer.Flag.Int("filer.cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") + filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") @@ -113,6 +114,7 @@ func init() { s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") + s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") @@ -223,7 +225,11 @@ func runServer(cmd *Command, args []string) bool { } - startMaster(masterOptions, serverWhiteList) + if *isStartingMasterServer { + go startMaster(masterOptions, serverWhiteList) + } + + select {} return true } diff --git a/weed/command/shell.go b/weed/command/shell.go index 6dd768f47..c9976e809 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -11,12 +11,14 @@ import ( var ( shellOptions shell.ShellOptions shellInitialFiler *string + shellCluster *string ) func init() { cmdShell.Run = runShell // break init cycle - shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers") - shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port") + shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333") + shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888") + shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml") } var cmdShell = &Command{ @@ -24,6 +26,8 @@ var cmdShell = &Command{ Short: "run interactive administrative commands", Long: `run interactive administrative commands. + Generate shell.toml via "weed scaffold -config=shell" + `, } @@ -32,6 +36,23 @@ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + if *shellOptions.Masters == "" && *shellInitialFiler == "" { + util.LoadConfiguration("shell", false) + v := util.GetViper() + cluster := v.GetString("cluster.default") + if *shellCluster != "" { + cluster = *shellCluster + } + if cluster == "" { + *shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888" + } else { + *shellOptions.Masters = v.GetString("cluster." + cluster + ".master") + *shellInitialFiler = v.GetString("cluster." + cluster + ".filer") + } + } + + fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) + var err error shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) if err != nil { diff --git a/weed/command/upload.go b/weed/command/upload.go index 45b15535b..7115da587 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -1,8 +1,12 @@ package command import ( + "context" "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" "os" "path/filepath" @@ -65,6 +69,15 @@ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master) + if err != nil { + fmt.Printf("upload: %v", err) + return false + } + if *upload.replication == "" { + *upload.replication = defaultCollection + } + if len(args) == 0 { if *upload.dir == "" { return false @@ -104,3 +117,15 @@ func runUpload(cmd *Command, args []string) bool { } return true } + +func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) { + err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", masterAddress, err) + } + replication = resp.DefaultReplication + return nil + }) + return +} diff --git a/weed/command/watch.go b/weed/command/watch.go deleted file mode 100644 index fd7dd6fb2..000000000 --- a/weed/command/watch.go +++ /dev/null @@ -1,113 +0,0 @@ -package command - -import ( - "context" - "fmt" - "io" - "path/filepath" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func init() { - cmdWatch.Run = runWatch // break init cycle -} - -var cmdWatch = &Command{ - UsageLine: "watch [-filer=localhost:8888] [-target=/]", - Short: "see recent changes on a filer", - Long: `See recent changes on a filer. - - `, -} - -var ( - watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port") - watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") - watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") - watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") -) - -func runWatch(cmd *Command, args []string) bool { - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - - var filterFunc func(dir, fname string) bool - if *watchPattern != "" { - if strings.Contains(*watchPattern, "/") { - println("watch path pattern", *watchPattern) - filterFunc = func(dir, fname string) bool { - matched, err := filepath.Match(*watchPattern, dir+"/"+fname) - if err != nil { - fmt.Printf("error: %v", err) - } - return matched - } - } else { - println("watch file pattern", *watchPattern) - filterFunc = func(dir, fname string) bool { - matched, err := filepath.Match(*watchPattern, fname) - if err != nil { - fmt.Printf("error: %v", err) - } - return matched - } - } - } - - shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { - if filterFunc == nil { - return true - } - if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { - return false - } - if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) { - return true - } - if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) { - return true - } - return false - } - - watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "watch", - PathPrefix: *watchTarget, - SinceNs: time.Now().Add(-*watchStart).UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - if !shouldPrint(resp) { - continue - } - fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification) - } - - }) - if watchErr != nil { - fmt.Printf("watch %s: %v\n", *watchFiler, watchErr) - } - - return true -} |
