aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/command.go3
-rw-r--r--weed/command/filer.go7
-rw-r--r--weed/command/filer_cat.go118
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/filer_meta_tail.go201
-rw-r--r--weed/command/filer_replication.go3
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go5
-rw-r--r--weed/command/s3.go17
-rw-r--r--weed/command/scaffold.go84
-rw-r--r--weed/command/server.go10
-rw-r--r--weed/command/shell.go25
-rw-r--r--weed/command/upload.go25
-rw-r--r--weed/command/watch.go113
14 files changed, 475 insertions, 140 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 0df22b575..3fa52c922 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -15,6 +15,8 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
+ cmdFilerCat,
+ cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
@@ -25,7 +27,6 @@ var Commands = []*Command{
cmdScaffold,
cmdServer,
cmdShell,
- cmdWatch,
cmdUpload,
cmdVersion,
cmdVolume,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a3008eb29..633c25cac 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -42,7 +42,7 @@ type FilerOptions struct {
cipher *bool
peers *string
metricsHttpPort *int
- cacheToFilerLimit *int
+ saveToFilerLimit *int
defaultLevelDbDirectory *string
}
@@ -64,7 +64,7 @@ func init() {
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
- f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer
@@ -74,6 +74,7 @@ func init() {
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
+ filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
}
var cmdFiler = &Command{
@@ -148,7 +149,7 @@ func (fo *FilerOptions) startFiler() {
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
- CacheToFilerLimit: int64(*fo.cacheToFilerLimit),
+ SaveToFilerLimit: *fo.saveToFilerLimit,
Filers: peers,
})
if nfs_err != nil {
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
new file mode 100644
index 000000000..a46098b04
--- /dev/null
+++ b/weed/command/filer_cat.go
@@ -0,0 +1,118 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "google.golang.org/grpc"
+ "math"
+ "net/url"
+ "os"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ filerCat FilerCatOptions
+)
+
+type FilerCatOptions struct {
+ grpcDialOption grpc.DialOption
+ filerAddress string
+ filerClient filer_pb.SeaweedFilerClient
+ output *string
+}
+
+func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
+ return func(fileId string) (targetUrls []string, err error) {
+ vid := filer.VolumeId(fileId)
+ resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return nil, err
+ }
+ locations := resp.LocationsMap[vid]
+ for _, loc := range locations.Locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
+ }
+ return
+ }
+}
+
+func init() {
+ cmdFilerCat.Run = runFilerCat // break init cycle
+ filerCat.output = cmdFilerCat.Flag.String("o", "", "write to file instead of stdout")
+}
+
+var cmdFilerCat = &Command{
+ UsageLine: "filer.cat [-o <file>] http://localhost:8888/path/to/file",
+ Short: "copy one file to local",
+ Long: `read one file to stdout or write to a file
+
+`,
+}
+
+func runFilerCat(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ if len(args) == 0 {
+ return false
+ }
+ filerSource := args[len(args)-1]
+
+ filerUrl, err := url.Parse(filerSource)
+ if err != nil {
+ fmt.Printf("The last argument should be a URL on filer: %v\n", err)
+ return false
+ }
+ urlPath := filerUrl.Path
+ if strings.HasSuffix(urlPath, "/") {
+ fmt.Printf("The last argument should be a file: %v\n", err)
+ return false
+ }
+
+ filerCat.filerAddress = filerUrl.Host
+ filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ dir, name := util.FullPath(urlPath).DirAndName()
+
+ writer := os.Stdout
+ if *filerCat.output != "" {
+
+ fmt.Printf("saving %s to %s\n", filerSource, *filerCat.output)
+
+ f, err := os.OpenFile(*filerCat.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
+ if err != nil {
+ fmt.Printf("open file %s: %v\n", *filerCat.output, err)
+ return false
+ }
+ defer f.Close()
+ writer = f
+ }
+
+ pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ return err
+ }
+
+ filerCat.filerClient = client
+
+ return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+
+ })
+
+ return true
+}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 93248f357..b95df696c 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -92,7 +92,7 @@ func runCopy(cmd *Command, args []string) bool {
}
urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
- fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
+ fmt.Printf("The last argument should be a folder and end with \"/\"\n")
return false
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
new file mode 100644
index 000000000..fa0262160
--- /dev/null
+++ b/weed/command/filer_meta_tail.go
@@ -0,0 +1,201 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ jsoniter "github.com/json-iterator/go"
+ "github.com/olivere/elastic/v7"
+ "io"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
+}
+
+var cmdFilerMetaTail = &Command{
+ UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
+ Short: "see recent changes on a filer",
+ Long: `See recent changes on a filer.
+
+ `,
+}
+
+var (
+ tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
+ esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
+ esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
+)
+
+func runFilerMetaTail(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ var filterFunc func(dir, fname string) bool
+ if *tailPattern != "" {
+ if strings.Contains(*tailPattern, "/") {
+ println("watch path pattern", *tailPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ } else {
+ println("watch file pattern", *tailPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*tailPattern, fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ }
+ }
+
+ shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
+ if filterFunc == nil {
+ return true
+ }
+ if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
+ return false
+ }
+ if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
+ return true
+ }
+ if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
+ return true
+ }
+ return false
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
+ return nil
+ }
+ if *esServers != "" {
+ var err error
+ eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
+ if err != nil {
+ fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
+ return false
+ }
+ }
+
+ tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "tail",
+ PathPrefix: *tailTarget,
+ SinceNs: time.Now().Add(-*tailStart).UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ if !shouldPrint(resp) {
+ continue
+ }
+ if err = eachEntryFunc(resp); err != nil {
+ return err
+ }
+ }
+
+ })
+ if tailErr != nil {
+ fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
+ }
+
+ return true
+}
+
+type EsDocument struct {
+ Dir string `json:"dir,omitempty"`
+ Name string `json:"name,omitempty"`
+ IsDirectory bool `json:"isDir,omitempty"`
+ Size uint64 `json:"size,omitempty"`
+ Uid uint32 `json:"uid,omitempty"`
+ Gid uint32 `json:"gid,omitempty"`
+ UserName string `json:"userName,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ Crtime int64 `json:"crtime,omitempty"`
+ Mtime int64 `json:"mtime,omitempty"`
+ Mime string `json:"mime,omitempty"`
+}
+
+func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
+ entry := event.NewEntry
+ dir, name := event.NewParentPath, entry.Name
+ id := util.Md5String([]byte(util.NewFullPath(dir, name)))
+ esEntry := &EsDocument{
+ Dir: dir,
+ Name: name,
+ IsDirectory: entry.IsDirectory,
+ Size: entry.Attributes.FileSize,
+ Uid: entry.Attributes.Uid,
+ Gid: entry.Attributes.Gid,
+ UserName: entry.Attributes.UserName,
+ Collection: entry.Attributes.Collection,
+ Crtime: entry.Attributes.Crtime,
+ Mtime: entry.Attributes.Mtime,
+ Mime: entry.Attributes.Mime,
+ }
+ return esEntry, id
+}
+
+func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
+ options := []elastic.ClientOptionFunc{}
+ options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
+ options = append(options, elastic.SetSniff(false))
+ options = append(options, elastic.SetHealthcheck(false))
+ client, err := elastic.NewClient(options...)
+ if err != nil {
+ return nil, err
+ }
+ return func(resp *filer_pb.SubscribeMetadataResponse) error {
+ event := resp.EventNotification
+ if event.OldEntry != nil &&
+ (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
+ // delete or not update the same file
+ dir, name := resp.Directory, event.OldEntry.Name
+ id := util.Md5String([]byte(util.NewFullPath(dir, name)))
+ println("delete", id)
+ _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
+ return err
+ }
+ if event.NewEntry != nil {
+ // add a new file or update the same file
+ esEntry, id := toEsEntry(event)
+ value, err := jsoniter.Marshal(esEntry)
+ if err != nil {
+ return err
+ }
+ println(string(value))
+ _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
+ return err
+ }
+ return nil
+ }, nil
+}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 40f2b570b..4f698e375 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -14,7 +14,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
func init() {
@@ -123,7 +122,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
-func validateOneEnabledInput(config *viper.Viper) {
+func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
diff --git a/weed/command/mount.go b/weed/command/mount.go
index f325cb0a5..fa75919aa 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -43,7 +43,7 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
- mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0")
+ mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 83cb352ff..9e955e344 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -58,6 +58,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return true
}
+ util.LoadConfiguration("security", false)
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
@@ -78,8 +79,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
dir := util.ResolvePath(*option.dir)
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
- util.LoadConfiguration("security", false)
-
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
@@ -151,6 +150,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
fuse.MaxReadahead(1024 * 128),
fuse.AsyncRead(),
fuse.WritebackCache(),
+ fuse.MaxBackground(128),
+ fuse.CongestionThreshold(128),
}
options = append(options, osSpecificMountOptions()...)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index ed5bb0b80..d8e3e306b 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -23,13 +23,14 @@ var (
)
type S3Options struct {
- filer *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
- metricsHttpPort *int
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
+ allowEmptyFolder *bool
}
func init() {
@@ -41,6 +42,7 @@ func init() {
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+ s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders")
}
var cmdS3 = &Command{
@@ -181,6 +183,7 @@ func (s3opt *S3Options) startS3Server() bool {
DomainName: *s3opt.domainName,
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
+ AllowEmptyFolder: *s3opt.allowEmptyFolder,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 6cfd46427..8b74274e5 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -44,6 +44,8 @@ func runScaffold(cmd *Command, args []string) bool {
content = SECURITY_TOML_EXAMPLE
case "master":
content = MASTER_TOML_EXAMPLE
+ case "shell":
+ content = SHELL_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@@ -85,9 +87,21 @@ buckets_folder = "/buckets"
# local on disk, mostly for simple single-machine setup, fairly scalable
# faster than previous leveldb, recommended.
enabled = true
-dir = "." # directory to store level db files
+dir = "./filerldb2" # directory to store level db files
-[mysql] # or tidb
+[leveldb3]
+# similar to leveldb2.
+# each bucket has its own meta store.
+enabled = false
+dir = "./filerldb3" # directory to store level db files
+
+[rocksdb]
+# local on disk, similar to leveldb
+# since it is using a C wrapper, you need to install rocksdb and build it by yourself
+enabled = false
+dir = "./filerrdb" # directory to store rocksdb files
+
+[mysql] # or memsql, tidb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
# name VARCHAR(1000) COMMENT 'directory or file name',
@@ -104,9 +118,31 @@ password = ""
database = "" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
+connection_max_lifetime_seconds = 0
+interpolateParams = false
+
+[mysql2] # or memsql, tidb
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS %s (
+ dirhash BIGINT,
+ name VARCHAR(1000),
+ directory TEXT,
+ meta LONGBLOB,
+ PRIMARY KEY (dirhash, name)
+ ) DEFAULT CHARSET=utf8;
+"""
+hostname = "localhost"
+port = 3306
+username = "root"
+password = ""
+database = "" # create or use an existing database
+connection_max_idle = 2
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
interpolateParams = false
-[postgres] # or cockroachdb
+[postgres] # or cockroachdb, YugabyteDB
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT,
# name VARCHAR(65535),
@@ -119,7 +155,29 @@ hostname = "localhost"
port = 5432
username = "postgres"
password = ""
-database = "" # create or use an existing database
+database = "postgres" # create or use an existing database
+schema = ""
+sslmode = "disable"
+connection_max_idle = 100
+connection_max_open = 100
+
+[postgres2]
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS %s (
+ dirhash BIGINT,
+ name VARCHAR(65535),
+ directory VARCHAR(65535),
+ meta bytea,
+ PRIMARY KEY (dirhash, name)
+ );
+"""
+hostname = "localhost"
+port = 5432
+username = "postgres"
+password = ""
+database = "postgres" # create or use an existing database
+schema = ""
sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
@@ -166,9 +224,9 @@ addresses = [
]
password = ""
# allows reads from slave servers or the master, but all writes still go to the master
-readOnly = true
+readOnly = false
# automatically use the closest Redis server for reads
-routeByLatency = true
+routeByLatency = false
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
@@ -460,4 +518,18 @@ copy_other = 1 # create n x 1 = n actual volumes
treat_replication_as_minimums = false
`
+ SHELL_TOML_EXAMPLE = `
+
+[cluster]
+default = "c1"
+
+[cluster.c1]
+master = "localhost:9333" # comma-separated master servers
+filer = "localhost:8888" # filer host and port
+
+[cluster.c2]
+master = ""
+filer = ""
+
+`
)
diff --git a/weed/command/server.go b/weed/command/server.go
index 7e63f8e8a..9976db2ea 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -61,6 +61,7 @@ var (
serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ isStartingMasterServer = cmdServer.Flag.Bool("master", true, "whether to start master server")
isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -94,7 +95,7 @@ func init() {
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
- filerOptions.cacheToFilerLimit = cmdServer.Flag.Int("filer.cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -113,6 +114,7 @@ func init() {
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+ s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
@@ -223,7 +225,11 @@ func runServer(cmd *Command, args []string) bool {
}
- startMaster(masterOptions, serverWhiteList)
+ if *isStartingMasterServer {
+ go startMaster(masterOptions, serverWhiteList)
+ }
+
+ select {}
return true
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 6dd768f47..c9976e809 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -11,12 +11,14 @@ import (
var (
shellOptions shell.ShellOptions
shellInitialFiler *string
+ shellCluster *string
)
func init() {
cmdShell.Run = runShell // break init cycle
- shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
- shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
+ shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333")
+ shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888")
+ shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml")
}
var cmdShell = &Command{
@@ -24,6 +26,8 @@ var cmdShell = &Command{
Short: "run interactive administrative commands",
Long: `run interactive administrative commands.
+ Generate shell.toml via "weed scaffold -config=shell"
+
`,
}
@@ -32,6 +36,23 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ if *shellOptions.Masters == "" && *shellInitialFiler == "" {
+ util.LoadConfiguration("shell", false)
+ v := util.GetViper()
+ cluster := v.GetString("cluster.default")
+ if *shellCluster != "" {
+ cluster = *shellCluster
+ }
+ if cluster == "" {
+ *shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888"
+ } else {
+ *shellOptions.Masters = v.GetString("cluster." + cluster + ".master")
+ *shellInitialFiler = v.GetString("cluster." + cluster + ".filer")
+ }
+ }
+
+ fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler)
+
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
if err != nil {
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 45b15535b..7115da587 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -1,8 +1,12 @@
package command
import (
+ "context"
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "google.golang.org/grpc"
"os"
"path/filepath"
@@ -65,6 +69,15 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master)
+ if err != nil {
+ fmt.Printf("upload: %v", err)
+ return false
+ }
+ if *upload.replication == "" {
+ *upload.replication = defaultCollection
+ }
+
if len(args) == 0 {
if *upload.dir == "" {
return false
@@ -104,3 +117,15 @@ func runUpload(cmd *Command, args []string) bool {
}
return true
}
+
+func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) {
+ err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
+ }
+ replication = resp.DefaultReplication
+ return nil
+ })
+ return
+}
diff --git a/weed/command/watch.go b/weed/command/watch.go
deleted file mode 100644
index fd7dd6fb2..000000000
--- a/weed/command/watch.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package command
-
-import (
- "context"
- "fmt"
- "io"
- "path/filepath"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- cmdWatch.Run = runWatch // break init cycle
-}
-
-var cmdWatch = &Command{
- UsageLine: "watch [-filer=localhost:8888] [-target=/]",
- Short: "see recent changes on a filer",
- Long: `See recent changes on a filer.
-
- `,
-}
-
-var (
- watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
- watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
- watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
- watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
-)
-
-func runWatch(cmd *Command, args []string) bool {
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
- var filterFunc func(dir, fname string) bool
- if *watchPattern != "" {
- if strings.Contains(*watchPattern, "/") {
- println("watch path pattern", *watchPattern)
- filterFunc = func(dir, fname string) bool {
- matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
- if err != nil {
- fmt.Printf("error: %v", err)
- }
- return matched
- }
- } else {
- println("watch file pattern", *watchPattern)
- filterFunc = func(dir, fname string) bool {
- matched, err := filepath.Match(*watchPattern, fname)
- if err != nil {
- fmt.Printf("error: %v", err)
- }
- return matched
- }
- }
- }
-
- shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
- if filterFunc == nil {
- return true
- }
- if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
- return false
- }
- if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
- return true
- }
- if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
- return true
- }
- return false
- }
-
- watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "watch",
- PathPrefix: *watchTarget,
- SinceNs: time.Now().Add(-*watchStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if !shouldPrint(resp) {
- continue
- }
- fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
- }
-
- })
- if watchErr != nil {
- fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
- }
-
- return true
-}