diff options
| author | bukton <buk_ton2@hotmail.com> | 2020-04-19 00:21:45 +0700 |
|---|---|---|
| committer | bukton <buk_ton2@hotmail.com> | 2020-04-19 00:21:45 +0700 |
| commit | 290c6b7f01f7b148a65ba10dd6536ad2567d7653 (patch) | |
| tree | e1abb141849419c40691097eea032b944fcbd748 /weed/command | |
| parent | 6234ea441b6388838a19635c656316047f42917d (diff) | |
| parent | 11f5a6d91346e5f3cbf3b46e0a660e231c5c2998 (diff) | |
| download | seaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.tar.xz seaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.zip | |
Merge remote-tracking branch 'origin/master' into filer_mongodb
# Conflicts:
# go.mod
# go.sum
# weed/server/filer_server.go
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_replication.go | 1 | ||||
| -rw-r--r-- | weed/command/master.go | 3 | ||||
| -rw-r--r-- | weed/command/mount.go | 12 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 6 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 28 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 17 | ||||
| -rw-r--r-- | weed/command/shell.go | 15 | ||||
| -rw-r--r-- | weed/command/volume.go | 3 | ||||
| -rw-r--r-- | weed/command/watch.go | 2 | ||||
| -rw-r--r-- | weed/command/webdav.go | 7 |
10 files changed, 55 insertions, 39 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index 737f0d24a..40f2b570b 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -121,7 +121,6 @@ func runFilerReplicate(cmd *Command, args []string) bool { } } - return true } func validateOneEnabledInput(config *viper.Viper) { diff --git a/weed/command/master.go b/weed/command/master.go index 1be60426f..6171f8e83 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -47,7 +47,7 @@ func init() { m.ip = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address") m.ipBind = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") - m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") + m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095") m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") m.volumePreallocate = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.") m.pulseSeconds = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") @@ -147,6 +147,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { + glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers) masterAddress = masterIp + ":" + strconv.Itoa(masterPort) if peers != "" { cleanedPeers = strings.Split(peers, ",") diff --git a/weed/command/mount.go b/weed/command/mount.go index adf384a6f..efa4650ab 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -1,5 +1,9 @@ package command +import ( + "os" +) + type MountOptions struct { filer *string filerMountRootPath *string @@ -9,7 +13,8 @@ type MountOptions struct { replication *string ttlSec *int chunkSizeLimitMB *int - chunkCacheCountLimit *int64 + cacheDir *string + cacheSizeMB *int64 dataCenter *string allowOthers *bool umaskString *string @@ -32,8 +37,9 @@ func init() { mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") - mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 4, "local write buffer size, also chunk large files") - mountOptions.chunkCacheCountLimit = cmdMount.Flag.Int64("chunkCacheCountLimit", 1000, "number of file chunks to cache in memory") + mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files") + mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") + mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB (0 will disable cache)") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 148540dec..0f87d6aee 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -129,7 +129,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { } options = append(options, osSpecificMountOptions()...) - if *option.allowOthers { options = append(options, fuse.AllowOther()) } @@ -137,12 +136,12 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { options = append(options, fuse.AllowNonEmptyMount()) } + // mount c, err := fuse.Mount(dir, options...) if err != nil { glog.V(0).Infof("mount: %v", err) return true } - defer fuse.Unmount(dir) util.OnInterrupt(func() { @@ -164,7 +163,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { Replication: *option.replication, TtlSec: int32(*option.ttlSec), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, - ChunkCacheCountLimit: *option.chunkCacheCountLimit, + CacheDir: *option.cacheDir, + CacheSizeMB: *option.cacheSizeMB, DataCenter: *option.dataCenter, DirListCacheLimit: *option.dirListCacheLimit, EntryCacheTtl: 3 * time.Second, diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 3e13b4730..67ebdfb6d 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -8,13 +8,12 @@ import ( "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,16 +22,14 @@ var ( ) type QueueOptions struct { - filer *string - port *int - defaultTtl *string + filer *string + port *int } func init() { cmdMsgBroker.Run = runMsgBroker // break init cycle messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port") - messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") } var cmdMsgBroker = &Command{ @@ -62,9 +59,8 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { return false } - filerQueuesPath := "/queues" - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") + cipher := false for { err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { @@ -72,8 +68,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) } - filerQueuesPath = resp.DirQueues - glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) + cipher = resp.Cipher return nil }) if err != nil { @@ -85,12 +80,13 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { } } - qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ + qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ Filers: []string{*msgBrokerOpt.filer}, DefaultReplication: "", MaxMB: 0, Port: *msgBrokerOpt.port, - }) + Cipher: cipher, + }, grpcDialOption) // start grpc listener grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) @@ -98,7 +94,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool { glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) - queue_pb.RegisterSeaweedQueueServer(grpcS, qs) + messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) reflection.Register(grpcS) grpcS.Serve(grpcL) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index e391c23ea..2c2bdc737 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -18,7 +18,7 @@ var cmdScaffold = &Command{ For example, the filer.toml mysql password can be overwritten by environment variable export WEED_MYSQL_PASSWORD=some_password Environment variable rules: - * Prefix fix with "WEED_" + * Prefix the variable name with "WEED_" * Upppercase the reset of variable name. * Replace '.' with '_' @@ -76,8 +76,10 @@ const ( recursive_delete = false # directories under this folder will be automatically creating a separate bucket buckets_folder = "/buckets" -# directories under this folder will be store message queue data -queues_folder = "/queues" +buckets_fsync = [ # a list of buckets with all write requests fsync=true + "important_bucket", + "should_always_fsync", +] #################################################### # The following are filer store options @@ -139,13 +141,13 @@ hosts=[ "localhost:9042", ] -[redis] +[redis2] enabled = false address = "localhost:6379" password = "" database = 0 -[redis_cluster] +[redis_cluster2] enabled = false addresses = [ "localhost:30001", @@ -260,6 +262,7 @@ aws_secret_access_key = "" # if empty, loads from the shared credentials fil region = "us-east-2" bucket = "your_bucket_name" # an existing bucket directory = "/" # destination directory +endpoint = "" [sink.google_cloud_storage] # read credentials doc at https://cloud.google.com/docs/authentication/getting-started @@ -358,11 +361,13 @@ scripts = """ ec.rebuild -force ec.balance -force volume.balance -force + volume.fix.replication """ sleep_minutes = 17 # sleep minutes between each script execution [master.filer] -default_filer_url = "http://localhost:8888/" +default = "localhost:8888" # used by maintenance scripts if the scripts needs to use fs related commands + [master.sequencer] type = "memory" # Choose [memory|etcd] type for storing the file id sequence diff --git a/weed/command/shell.go b/weed/command/shell.go index dcf70608f..6dd768f47 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -9,14 +9,14 @@ import ( ) var ( - shellOptions shell.ShellOptions - shellInitialFilerUrl *string + shellOptions shell.ShellOptions + shellInitialFiler *string ) func init() { cmdShell.Run = runShell // break init cycle shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers") - shellInitialFilerUrl = cmdShell.Flag.String("filer.url", "http://localhost:8888/", "initial filer url") + shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port") } var cmdShell = &Command{ @@ -32,12 +32,13 @@ func runShell(command *Command, args []string) bool { util.LoadConfiguration("security", false) shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - var filerPwdErr error - shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl) - if filerPwdErr != nil { - fmt.Printf("failed to parse url filer.url=%s : %v\n", *shellInitialFilerUrl, filerPwdErr) + var err error + shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) + if err != nil { + fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err) return false } + shellOptions.Directory = "/" shell.RunShell(shellOptions) diff --git a/weed/command/volume.go b/weed/command/volume.go index 68a0ce223..936998d82 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -127,7 +127,8 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v } if *v.ip == "" { - *v.ip = "127.0.0.1" + *v.ip = util.DetectedHostAddress() + glog.V(0).Infof("detected volume server ip address: %v", *v.ip) } if *v.publicPort == 0 { diff --git a/weed/command/watch.go b/weed/command/watch.go index 2dd6ec211..966040fbb 100644 --- a/weed/command/watch.go +++ b/weed/command/watch.go @@ -34,7 +34,7 @@ func runWatch(cmd *Command, args []string) bool { watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - stream, err := client.ListenForEvents(context.Background(), &filer_pb.ListenForEventsRequest{ + stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ ClientName: "watch", PathPrefix: *watchTarget, SinceNs: 0, diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 4f5d5f5ce..a1616d0fc 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "os/user" "strconv" "time" @@ -26,6 +27,8 @@ type WebDavOption struct { collection *string tlsPrivateKey *string tlsCertificate *string + cacheDir *string + cacheSizeMB *int64 } func init() { @@ -35,6 +38,8 @@ func init() { webDavStandaloneOptions.collection = cmdWebDav.Flag.String("collection", "", "collection to create the files") webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file") webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file") + webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks") + webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB") } var cmdWebDav = &Command{ @@ -105,6 +110,8 @@ func (wo *WebDavOption) startWebDav() bool { Uid: uid, Gid: gid, Cipher: cipher, + CacheDir: *wo.cacheDir, + CacheSizeMB: *wo.cacheSizeMB, }) if webdavServer_err != nil { glog.Fatalf("WebDav Server startup error: %v", webdavServer_err) |
