aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorbukton <buk_ton2@hotmail.com>2020-04-19 00:21:45 +0700
committerbukton <buk_ton2@hotmail.com>2020-04-19 00:21:45 +0700
commit290c6b7f01f7b148a65ba10dd6536ad2567d7653 (patch)
treee1abb141849419c40691097eea032b944fcbd748 /weed/command
parent6234ea441b6388838a19635c656316047f42917d (diff)
parent11f5a6d91346e5f3cbf3b46e0a660e231c5c2998 (diff)
downloadseaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.tar.xz
seaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.zip
Merge remote-tracking branch 'origin/master' into filer_mongodb
# Conflicts: # go.mod # go.sum # weed/server/filer_server.go
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/mount.go12
-rw-r--r--weed/command/mount_std.go6
-rw-r--r--weed/command/msg_broker.go28
-rw-r--r--weed/command/scaffold.go17
-rw-r--r--weed/command/shell.go15
-rw-r--r--weed/command/volume.go3
-rw-r--r--weed/command/watch.go2
-rw-r--r--weed/command/webdav.go7
10 files changed, 55 insertions, 39 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 737f0d24a..40f2b570b 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -121,7 +121,6 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
}
- return true
}
func validateOneEnabledInput(config *viper.Viper) {
diff --git a/weed/command/master.go b/weed/command/master.go
index 1be60426f..6171f8e83 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -47,7 +47,7 @@ func init() {
m.ip = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address")
m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
- m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
+ m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
@@ -147,6 +147,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
+ glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
masterAddress = masterIp + ":" + strconv.Itoa(masterPort)
if peers != "" {
cleanedPeers = strings.Split(peers, ",")
diff --git a/weed/command/mount.go b/weed/command/mount.go
index adf384a6f..efa4650ab 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,5 +1,9 @@
package command
+import (
+ "os"
+)
+
type MountOptions struct {
filer *string
filerMountRootPath *string
@@ -9,7 +13,8 @@ type MountOptions struct {
replication *string
ttlSec *int
chunkSizeLimitMB *int
- chunkCacheCountLimit *int64
+ cacheDir *string
+ cacheSizeMB *int64
dataCenter *string
allowOthers *bool
umaskString *string
@@ -32,8 +37,9 @@ func init() {
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
- mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files")
- mountOptions.chunkCacheCountLimit = cmdMount.Flag.Int64("chunkCacheCountLimit", 1000, "number of file chunks to cache in memory")
+ mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files")
+ mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 148540dec..0f87d6aee 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -129,7 +129,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
options = append(options, osSpecificMountOptions()...)
-
if *option.allowOthers {
options = append(options, fuse.AllowOther())
}
@@ -137,12 +136,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
options = append(options, fuse.AllowNonEmptyMount())
}
+ // mount
c, err := fuse.Mount(dir, options...)
if err != nil {
glog.V(0).Infof("mount: %v", err)
return true
}
-
defer fuse.Unmount(dir)
util.OnInterrupt(func() {
@@ -164,7 +163,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
- ChunkCacheCountLimit: *option.chunkCacheCountLimit,
+ CacheDir: *option.cacheDir,
+ CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
DirListCacheLimit: *option.dirListCacheLimit,
EntryCacheTtl: 3 * time.Second,
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 3e13b4730..67ebdfb6d 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -8,13 +8,12 @@ import (
"google.golang.org/grpc/reflection"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -23,16 +22,14 @@ var (
)
type QueueOptions struct {
- filer *string
- port *int
- defaultTtl *string
+ filer *string
+ port *int
}
func init() {
cmdMsgBroker.Run = runMsgBroker // break init cycle
messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
- messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
}
var cmdMsgBroker = &Command{
@@ -62,9 +59,8 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
return false
}
- filerQueuesPath := "/queues"
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
+ cipher := false
for {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -72,8 +68,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
- filerQueuesPath = resp.DirQueues
- glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
+ cipher = resp.Cipher
return nil
})
if err != nil {
@@ -85,12 +80,13 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
}
}
- qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{
+ qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
Filers: []string{*msgBrokerOpt.filer},
DefaultReplication: "",
MaxMB: 0,
Port: *msgBrokerOpt.port,
- })
+ Cipher: cipher,
+ }, grpcDialOption)
// start grpc listener
grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
@@ -98,7 +94,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
- queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
+ messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
reflection.Register(grpcS)
grpcS.Serve(grpcL)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index e391c23ea..2c2bdc737 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -18,7 +18,7 @@ var cmdScaffold = &Command{
For example, the filer.toml mysql password can be overwritten by environment variable
export WEED_MYSQL_PASSWORD=some_password
Environment variable rules:
- * Prefix fix with "WEED_"
+ * Prefix the variable name with "WEED_"
* Upppercase the reset of variable name.
* Replace '.' with '_'
@@ -76,8 +76,10 @@ const (
recursive_delete = false
# directories under this folder will be automatically creating a separate bucket
buckets_folder = "/buckets"
-# directories under this folder will be store message queue data
-queues_folder = "/queues"
+buckets_fsync = [ # a list of buckets with all write requests fsync=true
+ "important_bucket",
+ "should_always_fsync",
+]
####################################################
# The following are filer store options
@@ -139,13 +141,13 @@ hosts=[
"localhost:9042",
]
-[redis]
+[redis2]
enabled = false
address = "localhost:6379"
password = ""
database = 0
-[redis_cluster]
+[redis_cluster2]
enabled = false
addresses = [
"localhost:30001",
@@ -260,6 +262,7 @@ aws_secret_access_key = "" # if empty, loads from the shared credentials fil
region = "us-east-2"
bucket = "your_bucket_name" # an existing bucket
directory = "/" # destination directory
+endpoint = ""
[sink.google_cloud_storage]
# read credentials doc at https://cloud.google.com/docs/authentication/getting-started
@@ -358,11 +361,13 @@ scripts = """
ec.rebuild -force
ec.balance -force
volume.balance -force
+ volume.fix.replication
"""
sleep_minutes = 17 # sleep minutes between each script execution
[master.filer]
-default_filer_url = "http://localhost:8888/"
+default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands
+
[master.sequencer]
type = "memory" # Choose [memory|etcd] type for storing the file id sequence
diff --git a/weed/command/shell.go b/weed/command/shell.go
index dcf70608f..6dd768f47 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -9,14 +9,14 @@ import (
)
var (
- shellOptions shell.ShellOptions
- shellInitialFilerUrl *string
+ shellOptions shell.ShellOptions
+ shellInitialFiler *string
)
func init() {
cmdShell.Run = runShell // break init cycle
shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
- shellInitialFilerUrl = cmdShell.Flag.String("filer.url", "http://localhost:8888/", "initial filer url")
+ shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
}
var cmdShell = &Command{
@@ -32,12 +32,13 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- var filerPwdErr error
- shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl)
- if filerPwdErr != nil {
- fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr)
+ var err error
+ shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
+ if err != nil {
+ fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
return false
}
+ shellOptions.Directory = "/"
shell.RunShell(shellOptions)
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 68a0ce223..936998d82 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -127,7 +127,8 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
if *v.ip == "" {
- *v.ip = "127.0.0.1"
+ *v.ip = util.DetectedHostAddress()
+ glog.V(0).Infof("detected volume server ip address: %v", *v.ip)
}
if *v.publicPort == 0 {
diff --git a/weed/command/watch.go b/weed/command/watch.go
index 2dd6ec211..966040fbb 100644
--- a/weed/command/watch.go
+++ b/weed/command/watch.go
@@ -34,7 +34,7 @@ func runWatch(cmd *Command, args []string) bool {
watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- stream, err := client.ListenForEvents(context.Background(), &filer_pb.ListenForEventsRequest{
+ stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
ClientName: "watch",
PathPrefix: *watchTarget,
SinceNs: 0,
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 4f5d5f5ce..a1616d0fc 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
+ "os"
"os/user"
"strconv"
"time"
@@ -26,6 +27,8 @@ type WebDavOption struct {
collection *string
tlsPrivateKey *string
tlsCertificate *string
+ cacheDir *string
+ cacheSizeMB *int64
}
func init() {
@@ -35,6 +38,8 @@ func init() {
webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files")
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
+ webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
+ webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
}
var cmdWebDav = &Command{
@@ -105,6 +110,8 @@ func (wo *WebDavOption) startWebDav() bool {
Uid: uid,
Gid: gid,
Cipher: cipher,
+ CacheDir: *wo.cacheDir,
+ CacheSizeMB: *wo.cacheSizeMB,
})
if webdavServer_err != nil {
glog.Fatalf("WebDav Server startup error: %v", webdavServer_err)