aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/filer.go60
-rw-r--r--weed/command/filer_meta_backup.go21
-rw-r--r--weed/command/filer_meta_tail.go2
-rw-r--r--weed/command/filer_remote_gateway_buckets.go6
-rw-r--r--weed/command/filer_remote_sync_dir.go6
-rw-r--r--weed/command/filer_sync.go6
-rw-r--r--weed/command/iam.go13
-rw-r--r--weed/command/master.go82
-rw-r--r--weed/command/master_follower.go29
-rw-r--r--weed/command/mount.go4
-rw-r--r--weed/command/mount_darwin.go8
-rw-r--r--weed/command/mount_freebsd.go13
-rw-r--r--weed/command/mount_linux.go6
-rw-r--r--weed/command/mount_notsupported.go4
-rw-r--r--weed/command/mount_std.go168
-rw-r--r--weed/command/msg_broker.go2
-rw-r--r--weed/command/s3.go24
-rw-r--r--weed/command/scaffold/filer.toml34
-rw-r--r--weed/command/scaffold/security.toml6
-rw-r--r--weed/command/server.go16
-rw-r--r--weed/command/volume.go25
-rw-r--r--weed/command/webdav.go2
23 files changed, 345 insertions, 194 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index af5919adf..7091463cc 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses())
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 876b1bbf0..0a768944b 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ "net"
"net/http"
"os"
"time"
@@ -12,7 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -28,7 +29,7 @@ var (
)
type FilerOptions struct {
- masters []pb.ServerAddress
+ masters map[string]pb.ServerAddress
mastersString *string
ip *string
bindIp *string
@@ -51,6 +52,7 @@ type FilerOptions struct {
concurrentUploadLimitMB *int
debug *bool
debugPort *int
+ localSocket *string
}
func init() {
@@ -58,7 +60,7 @@ func init() {
f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
- f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to")
+ f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
@@ -76,6 +78,7 @@ func init() {
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
+ f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -96,10 +99,11 @@ func init() {
filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
- filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB")
+ filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
// start iam on filer
filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service")
+ filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address")
filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
}
@@ -139,11 +143,14 @@ func runFiler(cmd *Command, args []string) bool {
if *filerStartS3 {
filerS3Options.filer = &filerAddress
filerS3Options.bindIp = f.bindIp
+ filerS3Options.localFilerSocket = f.localSocket
go func() {
time.Sleep(startDelay * time.Second)
filerS3Options.startS3Server()
}()
startDelay++
+ } else {
+ *f.localSocket = ""
}
if *filerStartWebDav {
@@ -164,7 +171,7 @@ func runFiler(cmd *Command, args []string) bool {
}()
}
- f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses()
+ f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
f.startFiler()
@@ -182,6 +189,9 @@ func (fo *FilerOptions) startFiler() {
if *fo.portGrpc == 0 {
*fo.portGrpc = 10000 + *fo.port
}
+ if *fo.bindIp == "" {
+ *fo.bindIp = *fo.ip
+ }
defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
@@ -210,7 +220,7 @@ func (fo *FilerOptions) startFiler() {
if *fo.publicPort != 0 {
publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort)
glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, 0)
+ publicListener, localPublicListner, e := util.NewIpAndLocalListeners(*fo.bindIp, *fo.publicPort, 0)
if e != nil {
glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e)
}
@@ -219,29 +229,61 @@ func (fo *FilerOptions) startFiler() {
glog.Fatalf("Volume server fail to serve public: %v", e)
}
}()
+ if localPublicListner != nil {
+ go func() {
+ if e := http.Serve(localPublicListner, publicVolumeMux); e != nil {
+ glog.Errorf("Volume server fail to serve public: %v", e)
+ }
+ }()
+ }
}
glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
- filerListener, e := util.NewListener(
- util.JoinHostPort(*fo.bindIp, *fo.port),
+ filerListener, filerLocalListener, e := util.NewIpAndLocalListeners(
+ *fo.bindIp, *fo.port,
time.Duration(10)*time.Second,
)
if e != nil {
glog.Fatalf("Filer listener error: %v", e)
}
+ // start on local unix socket
+ if *fo.localSocket == "" {
+ *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
+ if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
+ glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
+ }
+ }
+ filerSocketListener, err := net.Listen("unix", *fo.localSocket)
+ if err != nil {
+ glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
+ }
+
// starting grpc server
grpcPort := *fo.portGrpc
- grpcL, err := util.NewListener(util.JoinHostPort(*fo.bindIp, grpcPort), 0)
+ grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
+ if grpcLocalL != nil {
+ go grpcS.Serve(grpcLocalL)
+ }
go grpcS.Serve(grpcL)
httpS := &http.Server{Handler: defaultMux}
+ go func() {
+ httpS.Serve(filerSocketListener)
+ }()
+ if filerLocalListener != nil {
+ go func() {
+ if err := httpS.Serve(filerLocalListener); err != nil {
+ glog.Errorf("Filer Fail to serve: %v", e)
+ }
+ }()
+ }
if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 56c7f7a8c..b7cb855f9 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -162,24 +162,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
ctx := context.Background()
message := resp.EventNotification
- if message.OldEntry == nil && message.NewEntry == nil {
+ if filer_pb.IsEmpty(resp) {
return nil
- }
- if message.OldEntry == nil && message.NewEntry != nil {
+ } else if filer_pb.IsCreate(resp) {
println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
return store.InsertEntry(ctx, entry)
- }
- if message.OldEntry != nil && message.NewEntry == nil {
+ } else if filer_pb.IsDelete(resp) {
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
- }
- if message.OldEntry != nil && message.NewEntry != nil {
- if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
- println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
- entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
- return store.UpdateEntry(ctx, entry)
- }
+ } else if filer_pb.IsUpdate(resp) {
+ println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.UpdateEntry(ctx, entry)
+ } else {
+ // renaming
println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
return err
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 1158ef1e0..51c4e7128 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -74,7 +74,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
- if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
+ if filer_pb.IsEmpty(resp) {
return false
}
if filterFunc == nil {
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index afe640f5f..cc49a1b95 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -174,10 +174,10 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
return handleEtcRemoteChanges(resp)
}
- if message.OldEntry == nil && message.NewEntry == nil {
+ if filer_pb.IsEmpty(resp) {
return nil
}
- if message.OldEntry == nil && message.NewEntry != nil {
+ if filer_pb.IsCreate(resp) {
if message.NewParentPath == option.bucketsDir {
return handleCreateBucket(message.NewEntry)
}
@@ -212,7 +212,7 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
- if message.OldEntry != nil && message.NewEntry == nil {
+ if filer_pb.IsDelete(resp) {
if resp.Directory == option.bucketsDir {
return handleDeleteBucket(message.OldEntry)
}
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index ccedc9d80..5859645e9 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -91,10 +91,10 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
return handleEtcRemoteChanges(resp)
}
- if message.OldEntry == nil && message.NewEntry == nil {
+ if filer_pb.IsEmpty(resp) {
return nil
}
- if message.OldEntry == nil && message.NewEntry != nil {
+ if filer_pb.IsCreate(resp) {
if !filer.HasData(message.NewEntry) {
return nil
}
@@ -115,7 +115,7 @@ func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string,
}
return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
}
- if message.OldEntry != nil && message.NewEntry == nil {
+ if filer_pb.IsDelete(resp) {
glog.V(2).Infof("delete: %+v", resp)
dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
if message.OldEntry.IsDirectory {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 172be6a9a..37ce2aa73 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -262,7 +262,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
}
// handle deletions
- if message.OldEntry != nil && message.NewEntry == nil {
+ if filer_pb.IsDelete(resp) {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
@@ -271,7 +271,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
}
// handle new entries
- if message.OldEntry == nil && message.NewEntry != nil {
+ if filer_pb.IsCreate(resp) {
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
return nil
}
@@ -280,7 +280,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
}
// this is something special?
- if message.OldEntry == nil && message.NewEntry == nil {
+ if filer_pb.IsEmpty(resp) {
return nil
}
diff --git a/weed/command/iam.go b/weed/command/iam.go
index 8fb14be06..968d23095 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -22,6 +22,7 @@ var (
type IamOptions struct {
filer *string
masters *string
+ ip *string
port *int
}
@@ -29,6 +30,7 @@ func init() {
cmdIam.Run = runIam // break init cycle
iamStandaloneOptions.filer = cmdIam.Flag.String("filer", "localhost:8888", "filer server address")
iamStandaloneOptions.masters = cmdIam.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ iamStandaloneOptions.ip = cmdIam.Flag.String("ip", util.DetectedHostAddress(), "iam server http listen ip address")
iamStandaloneOptions.port = cmdIam.Flag.Int("port", 8111, "iam server http listen port")
}
@@ -65,7 +67,7 @@ func (iamopt *IamOptions) startIamServer() bool {
}
}
- masters := pb.ServerAddresses(*iamopt.masters).ToAddresses()
+ masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap()
router := mux.NewRouter().SkipClean(true)
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters,
@@ -81,12 +83,19 @@ func (iamopt *IamOptions) startIamServer() bool {
httpS := &http.Server{Handler: router}
listenAddress := fmt.Sprintf(":%d", *iamopt.port)
- iamApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
+ iamApiListener, iamApiLocalListener, err := util.NewIpAndLocalListeners(*iamopt.ip, *iamopt.port, time.Duration(10)*time.Second)
if err != nil {
glog.Fatalf("IAM API Server listener on %s error: %v", listenAddress, err)
}
glog.V(0).Infof("Start Seaweed IAM API Server %s at http port %d", util.Version(), *iamopt.port)
+ if iamApiLocalListener != nil {
+ go func() {
+ if err = httpS.Serve(iamApiLocalListener); err != nil {
+ glog.Errorf("IAM API Server Fail to serve: %v", err)
+ }
+ }()
+ }
if err = httpS.Serve(iamApiListener); err != nil {
glog.Fatalf("IAM API Server Fail to serve: %v", err)
}
diff --git a/weed/command/master.go b/weed/command/master.go
index 0f598f2da..e56ee19fe 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,23 +1,25 @@
package command
import (
- "github.com/chrislusf/raft/protobuf"
- stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/gorilla/mux"
- "google.golang.org/grpc/reflection"
"net/http"
"os"
"sort"
"strings"
"time"
+ "github.com/chrislusf/raft/protobuf"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/gorilla/mux"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc/reflection"
+
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -44,6 +46,8 @@ type MasterOptions struct {
metricsIntervalSec *int
raftResumeState *bool
metricsHttpPort *int
+ heartbeatInterval *time.Duration
+ electionTimeout *time.Duration
}
func init() {
@@ -51,7 +55,7 @@ func init() {
m.port = cmdMaster.Flag.Int("port", 9333, "http listen port")
m.portGrpc = cmdMaster.Flag.Int("port.grpc", 0, "grpc listen port")
m.ip = cmdMaster.Flag.String("ip", util.DetectedHostAddress(), "master <ip>|<server> address, also used as identifier")
- m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to")
+ m.ipBind = cmdMaster.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
m.metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
m.peers = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
m.volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
@@ -65,6 +69,8 @@ func init() {
m.metricsIntervalSec = cmdMaster.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
m.metricsHttpPort = cmdMaster.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
+ m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
+ m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
}
var cmdMaster = &Command{
@@ -120,20 +126,38 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
if *masterOption.portGrpc == 0 {
*masterOption.portGrpc = 10000 + *masterOption.port
}
+ if *masterOption.ipBind == "" {
+ *masterOption.ipBind = *masterOption.ip
+ }
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
+ masterPeers := make(map[string]pb.ServerAddress)
+ for _, peer := range peers {
+ masterPeers[peer.String()] = peer
+ }
+
r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers)
listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
- masterListener, e := util.NewListener(listeningAddress, 0)
+ masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
+
// start raftServer
- raftServer, err := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
- peers, myMasterAddress, util.ResolvePath(*masterOption.metaFolder), ms.Topo, *masterOption.raftResumeState)
+ raftServerOption := &weed_server.RaftServerOption{
+ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
+ Peers: masterPeers,
+ ServerAddr: myMasterAddress,
+ DataDir: util.ResolvePath(*masterOption.metaFolder),
+ Topo: ms.Topo,
+ RaftResumeState: *masterOption.raftResumeState,
+ HeartbeatInterval: *masterOption.heartbeatInterval,
+ ElectionTimeout: *masterOption.electionTimeout,
+ }
+ raftServer, err := weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}
@@ -141,7 +165,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
// starting grpc server
grpcPort := *masterOption.portGrpc
- grpcL, err := util.NewListener(util.JoinHostPort(*masterOption.ipBind, grpcPort), 0)
+ grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
@@ -150,6 +174,9 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
+ if grpcLocalL != nil {
+ go grpcS.Serve(grpcLocalL)
+ }
go grpcS.Serve(grpcL)
go func() {
@@ -164,8 +191,39 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
go ms.MasterClient.KeepConnectedToMaster()
// start http server
+ var (
+ clientCertFile,
+ certFile,
+ keyFile string
+ )
+ useTLS := false
+ useMTLS := false
+
+ if viper.GetString("https.master.key") != "" {
+ useTLS = true
+ certFile = viper.GetString("https.master.cert")
+ keyFile = viper.GetString("https.master.key")
+ }
+
+ if viper.GetString("https.master.ca") != "" {
+ useMTLS = true
+ clientCertFile = viper.GetString("https.master.ca")
+ }
+
httpS := &http.Server{Handler: r}
- go httpS.Serve(masterListener)
+ if masterLocalListner != nil {
+ go httpS.Serve(masterLocalListner)
+ }
+
+ if useMTLS {
+ httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile)
+ }
+
+ if useTLS {
+ go httpS.ServeTLS(masterListener, certFile, keyFile)
+ } else {
+ go httpS.Serve(masterListener)
+ }
select {}
}
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index 6d7aa2848..ec7d2758f 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -3,17 +3,18 @@ package command
import (
"context"
"fmt"
+ "net/http"
+ "time"
+
"github.com/aws/aws-sdk-go/aws"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"google.golang.org/grpc/reflection"
- "net/http"
- "time"
)
var (
@@ -24,7 +25,7 @@ func init() {
cmdMasterFollower.Run = runMasterFollower // break init cycle
mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port")
mf.portGrpc = cmdMasterFollower.Flag.Int("port.grpc", 0, "grpc listen port")
- mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to")
+ mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
mf.ip = aws.String(util.DetectedHostAddress())
@@ -45,13 +46,13 @@ var cmdMasterFollower = &Command{
Short: "start a master follower",
Long: `start a master follower to provide volume=>location mapping service
- The master follower does not participate in master election.
+ The master follower does not participate in master election.
It just follow the existing masters, and listen for any volume location changes.
In most cases, the master follower is not needed. In big data centers with thousands of volume
servers. In theory, the master may have trouble to keep up with the write requests and read requests.
- The master follower can relieve the master from from read requests, which only needs to
+ The master follower can relieve the master from from read requests, which only needs to
lookup a fileId or volumeId.
The master follower currently can handle fileId lookup requests:
@@ -82,7 +83,7 @@ func runMasterFollower(cmd *Command, args []string) bool {
func startMasterFollower(masterOptions MasterOptions) {
// collect settings from main masters
- masters := pb.ServerAddresses(*mf.peers).ToAddresses()
+ masters := pb.ServerAddresses(*mf.peers).ToAddressMap()
var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
@@ -111,18 +112,22 @@ func startMasterFollower(masterOptions MasterOptions) {
option := masterOptions.toMasterOption(nil)
option.IsFollower = true
+ if *masterOptions.ipBind == "" {
+ *masterOptions.ipBind = *masterOptions.ip
+ }
+
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, option, masters)
listeningAddress := util.JoinHostPort(*masterOptions.ipBind, *masterOptions.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
- masterListener, e := util.NewListener(listeningAddress, 0)
+ masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOptions.ipBind, *masterOptions.port, 0)
if e != nil {
glog.Fatalf("Master startup error: %v", e)
}
// starting grpc server
grpcPort := *masterOptions.portGrpc
- grpcL, err := util.NewListener(util.JoinHostPort(*masterOptions.ipBind, grpcPort), 0)
+ grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOptions.ipBind, grpcPort, 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
@@ -130,12 +135,18 @@ func startMasterFollower(masterOptions MasterOptions) {
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort)
+ if grpcLocalL != nil {
+ go grpcS.Serve(grpcLocalL)
+ }
go grpcS.Serve(grpcL)
go ms.MasterClient.KeepConnectedToMaster()
// start http server
httpS := &http.Server{Handler: r}
+ if masterLocalListner != nil {
+ go httpS.Serve(masterLocalListner)
+ }
go httpS.Serve(masterListener)
select {}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index e54f1f07f..428e073f2 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -11,6 +11,7 @@ type MountOptions struct {
dir *string
dirAutoCreate *bool
collection *string
+ collectionQuota *int
replication *string
diskType *string
ttlSec *int
@@ -44,13 +45,14 @@ func init() {
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
mountOptions.dirAutoCreate = cmdMount.Flag.Bool("dirAutoCreate", false, "auto create the directory to mount to")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
+ mountOptions.collectionQuota = cmdMount.Flag.Int("collectionQuotaMB", 0, "quota for the collection")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.diskType = cmdMount.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
- mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
+ mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 0, "local file chunk cache capacity in MB")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system")
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go
index f0a5581e7..05d6a1bc4 100644
--- a/weed/command/mount_darwin.go
+++ b/weed/command/mount_darwin.go
@@ -1,13 +1,5 @@
package command
-import (
- "github.com/seaweedfs/fuse"
-)
-
-func osSpecificMountOptions() []fuse.MountOption {
- return []fuse.MountOption{}
-}
-
func checkMountPointAvailable(dir string) bool {
return true
}
diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go
deleted file mode 100644
index f0a5581e7..000000000
--- a/weed/command/mount_freebsd.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package command
-
-import (
- "github.com/seaweedfs/fuse"
-)
-
-func osSpecificMountOptions() []fuse.MountOption {
- return []fuse.MountOption{}
-}
-
-func checkMountPointAvailable(dir string) bool {
- return true
-}
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
index 25c4f72cf..aebb14e61 100644
--- a/weed/command/mount_linux.go
+++ b/weed/command/mount_linux.go
@@ -6,8 +6,6 @@ import (
"io"
"os"
"strings"
-
- "github.com/seaweedfs/fuse"
)
const (
@@ -137,10 +135,6 @@ func parseInfoFile(r io.Reader) ([]*Info, error) {
return out, nil
}
-func osSpecificMountOptions() []fuse.MountOption {
- return []fuse.MountOption{}
-}
-
func checkMountPointAvailable(dir string) bool {
mountPoint := dir
if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {
diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go
index 1e5c9f53d..894c8e313 100644
--- a/weed/command/mount_notsupported.go
+++ b/weed/command/mount_notsupported.go
@@ -1,5 +1,5 @@
-//go:build !linux && !darwin && !freebsd
-// +build !linux,!darwin,!freebsd
+//go:build !linux && !darwin
+// +build !linux,!darwin
package command
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 8f62b4ec9..d865e053f 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -1,34 +1,28 @@
-//go:build linux || darwin || freebsd
-// +build linux darwin freebsd
+//go:build linux || darwin
+// +build linux darwin
package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mount"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/mount/unmount"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/hanwen/go-fuse/v2/fuse"
"net/http"
"os"
"os/user"
- "path"
- "path/filepath"
"runtime"
"strconv"
"strings"
- "syscall"
"time"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
-
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/filesys"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
)
@@ -58,27 +52,18 @@ func runMount(cmd *Command, args []string) bool {
return RunMount(&mountOptions, os.FileMode(umask))
}
-func getParentInode(mountDir string) (uint64, error) {
- parentDir := filepath.Clean(filepath.Join(mountDir, ".."))
- fi, err := os.Stat(parentDir)
- if err != nil {
- return 0, err
- }
+func RunMount(option *MountOptions, umask os.FileMode) bool {
- stat, ok := fi.Sys().(*syscall.Stat_t)
- if !ok {
- return 0, nil
+ // basic checks
+ chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
+ if chunkSizeLimitMB <= 0 {
+ fmt.Printf("Please specify a reasonable buffer size.")
+ return false
}
- return stat.Ino, nil
-}
-
-func RunMount(option *MountOptions, umask os.FileMode) bool {
-
+ // try to connect to filer
filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
-
util.LoadConfiguration("security", false)
- // try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
var err error
@@ -103,26 +88,15 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
filerMountRootPath := *option.filerMountRootPath
- dir := util.ResolvePath(*option.dir)
- parentInode, err := getParentInode(dir)
- if err != nil {
- glog.Errorf("failed to retrieve inode for parent directory of %s: %v", dir, err)
- return true
- }
- fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
+ // clean up mount point
+ dir := util.ResolvePath(*option.dir)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
- chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
- if chunkSizeLimitMB <= 0 {
- fmt.Printf("Please specify a reasonable buffer size.")
- return false
- }
-
- fuse.Unmount(dir)
+ unmount.Unmount(dir)
// detect mount folder mode
if *option.dirAutoCreate {
@@ -130,6 +104,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
}
fileInfo, err := os.Stat(dir)
+ // collect uid, gid
uid, gid := uint32(0), uint32(0)
mountMode := os.ModeDir | 0777
if err == nil {
@@ -141,6 +116,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return false
}
+ // detect uid, gid
if uid == 0 {
if u, err := user.Current(); err == nil {
if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
@@ -166,35 +142,51 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return true
}
- mountName := path.Base(dir)
-
- options := []fuse.MountOption{
- fuse.VolumeName(mountName),
- fuse.FSName(*option.filer + ":" + filerMountRootPath),
- fuse.Subtype("seaweedfs"),
- // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
- fuse.NoAppleXattr(),
- fuse.ExclCreate(),
- fuse.DaemonTimeout("3600"),
- fuse.AllowDev(),
- fuse.AllowSUID(),
- fuse.DefaultPermissions(),
- fuse.MaxReadahead(1024 * 512),
- fuse.AsyncRead(),
- // fuse.WritebackCache(),
- // fuse.MaxBackground(1024),
- // fuse.CongestionThreshold(1024),
- }
-
- options = append(options, osSpecificMountOptions()...)
- if *option.allowOthers {
- options = append(options, fuse.AllowOther())
+ serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
+
+ // mount fuse
+ fuseMountOptions := &fuse.MountOptions{
+ AllowOther: *option.allowOthers,
+ Options: nil,
+ MaxBackground: 128,
+ MaxWrite: 1024 * 1024 * 2,
+ MaxReadAhead: 1024 * 1024 * 2,
+ IgnoreSecurityLabels: false,
+ RememberInodes: false,
+ FsName: serverFriendlyName + ":" + filerMountRootPath,
+ Name: "seaweedfs",
+ SingleThreaded: false,
+ DisableXAttrs: false,
+ Debug: *option.debug,
+ EnableLocks: false,
+ ExplicitDataCacheControl: false,
+ DirectMount: true,
+ DirectMountFlags: 0,
+ //SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
+ //EnableAcl: true,
}
if *option.nonempty {
- options = append(options, fuse.AllowNonEmptyMount())
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
}
if *option.readOnly {
- options = append(options, fuse.ReadOnly())
+ if runtime.GOOS == "darwin" {
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly")
+ } else {
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "ro")
+ }
+ }
+ if runtime.GOOS == "darwin" {
+ // https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options
+ ioSizeMB := 1
+ for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 {
+ ioSizeMB *= 2
+ }
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600")
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr")
+ // fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache") // need to test effectiveness
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs")
+ fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName)
+ fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
}
// find mount point
@@ -203,9 +195,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
mountRoot = mountRoot[0 : len(mountRoot)-1]
}
- diskType := types.ToDiskType(*option.diskType)
-
- seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
+ seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
MountDirectory: dir,
FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
@@ -213,49 +203,37 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Collection: *option.collection,
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
- DiskType: diskType,
+ DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
+ Quota: int64(*option.collectionQuota) * 1024 * 1024,
MountUid: uid,
MountGid: gid,
MountMode: mountMode,
MountCtime: fileInfo.ModTime(),
MountMtime: time.Now(),
- MountParentInode: parentInode,
Umask: umask,
VolumeServerAccess: *mountOptions.volumeServerAccess,
Cipher: cipher,
UidGidMapper: uidGidMapper,
})
- // mount
- c, err := fuse.Mount(dir, options...)
+ server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
if err != nil {
- glog.V(0).Infof("mount: %v", err)
- return true
+ glog.Fatalf("Mount fail: %v", err)
}
- defer fuse.Unmount(dir)
-
grace.OnInterrupt(func() {
- fuse.Unmount(dir)
- c.Close()
+ unmount.Unmount(dir)
})
- glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
- server := fs.New(c, nil)
- seaweedFileSystem.Server = server
seaweedFileSystem.StartBackgroundTasks()
- err = server.Serve(seaweedFileSystem)
- // check if the mount process has an error to report
- <-c.Ready
- if err := c.MountError; err != nil {
- glog.V(0).Infof("mount process: %v", err)
- return true
- }
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
+
+ server.Serve()
return true
}
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 35d59ea20..3274f599b 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -95,7 +95,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
}, grpcDialOption)
// start grpc listener
- grpcL, err := util.NewListener(util.JoinHostPort("", *msgBrokerOpt.port), 0)
+ grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index ee726fcec..467da73fd 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -34,12 +34,13 @@ type S3Options struct {
metricsHttpPort *int
allowEmptyFolder *bool
auditLogConfig *string
+ localFilerSocket *string
}
func init() {
cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
- s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to")
+ s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
@@ -184,6 +185,7 @@ func (s3opt *S3Options) startS3Server() bool {
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
+ LocalFilerSocket: s3opt.localFilerSocket,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
@@ -191,8 +193,12 @@ func (s3opt *S3Options) startS3Server() bool {
httpS := &http.Server{Handler: router}
+ if *s3opt.bindIp == "" {
+ *s3opt.bindIp = "localhost"
+ }
+
listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port)
- s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
+ s3ApiListener, s3ApiLocalListner, err := util.NewIpAndLocalListeners(*s3opt.bindIp, *s3opt.port, time.Duration(10)*time.Second)
if err != nil {
glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
}
@@ -206,11 +212,25 @@ func (s3opt *S3Options) startS3Server() bool {
if *s3opt.tlsPrivateKey != "" {
glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
+ if s3ApiLocalListner != nil {
+ go func() {
+ if err = httpS.ServeTLS(s3ApiLocalListner, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
+ glog.Fatalf("S3 API Server Fail to serve: %v", err)
+ }
+ }()
+ }
if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
} else {
glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
+ if s3ApiLocalListner != nil {
+ go func() {
+ if err = httpS.Serve(s3ApiLocalListner); err != nil {
+ glog.Fatalf("S3 API Server Fail to serve: %v", err)
+ }
+ }()
+ }
if err = httpS.Serve(s3ApiListener); err != nil {
glog.Fatalf("S3 API Server Fail to serve: %v", err)
}
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 77c6cd58b..5d4513c36 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -195,6 +195,40 @@ routeByLatency = false
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
+[redis_lua]
+enabled = false
+address = "localhost:6379"
+password = ""
+database = 0
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
+[redis_lua_sentinel]
+enabled = false
+addresses = ["172.22.12.7:26379","172.22.12.8:26379","172.22.12.9:26379"]
+masterName = "master"
+username = ""
+password = ""
+database = 0
+
+[redis_lua_cluster]
+enabled = false
+addresses = [
+ "localhost:30001",
+ "localhost:30002",
+ "localhost:30003",
+ "localhost:30004",
+ "localhost:30005",
+ "localhost:30006",
+]
+password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = false
+# automatically use the closest Redis server for reads
+routeByLatency = false
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
[redis3] # beta
enabled = false
address = "localhost:6379"
diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml
index 090f4f664..38a803dd6 100644
--- a/weed/command/scaffold/security.toml
+++ b/weed/command/scaffold/security.toml
@@ -83,7 +83,13 @@ key = ""
# this does not work with other clients, e.g., "weed filer|mount" etc, yet.
[https.client]
enabled = true
+
[https.volume]
cert = ""
key = ""
+ca = ""
+[https.master]
+cert = ""
+key = ""
+ca = ""
diff --git a/weed/command/server.go b/weed/command/server.go
index 01c59fb85..0cc60fd30 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -55,7 +55,7 @@ var cmdServer = &Command{
var (
serverIp = cmdServer.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
- serverBindIp = cmdServer.Flag.String("ip.bind", "", "ip address to bind to")
+ serverBindIp = cmdServer.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
@@ -98,6 +98,8 @@ func init() {
masterOptions.metricsAddress = cmdServer.Flag.String("metrics.address", "", "Prometheus gateway address")
masterOptions.metricsIntervalSec = cmdServer.Flag.Int("metrics.intervalSeconds", 15, "Prometheus push interval in seconds")
masterOptions.raftResumeState = cmdServer.Flag.Bool("resumeState", false, "resume previous state on start master server")
+ masterOptions.heartbeatInterval = cmdServer.Flag.Duration("master.heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
+ masterOptions.electionTimeout = cmdServer.Flag.Duration("master.electionTimeout", 10*time.Second, "election timeout of master servers")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
@@ -110,6 +112,7 @@ func init() {
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
+ filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
@@ -145,7 +148,7 @@ func init() {
webdavOptions.tlsPrivateKey = cmdServer.Flag.String("webdav.key.file", "", "path to the TLS private key file")
webdavOptions.tlsCertificate = cmdServer.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
- webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB")
+ webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
@@ -181,13 +184,19 @@ func runServer(cmd *Command, args []string) bool {
masterOptions.peers = &peers
}
+ if *serverBindIp == "" {
+ serverBindIp = serverIp
+ }
+
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses()
+ filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap()
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
s3Options.bindIp = serverBindIp
+ iamOptions.ip = serverBindIp
+ iamOptions.masters = masterOptions.peers
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
serverOptions.v.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses()
@@ -242,6 +251,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingS3 {
go func() {
time.Sleep(2 * time.Second)
+ s3Options.localFilerSocket = filerOptions.localSocket
s3Options.startS3Server()
}()
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 5b9d94b9a..645c698b1 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -2,7 +2,6 @@ package command
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
httppprof "net/http/pprof"
"os"
@@ -11,6 +10,8 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+
"github.com/spf13/viper"
"google.golang.org/grpc"
@@ -24,7 +25,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -74,7 +75,7 @@ func init() {
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
- v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to")
+ v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
v.mastersString = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
@@ -94,7 +95,7 @@ func init() {
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
- v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port")
+ v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port")
}
var cmdVolume = &Command{
@@ -193,6 +194,9 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.ip = util.DetectedHostAddress()
glog.V(0).Infof("detected volume server ip address: %v", *v.ip)
}
+ if *v.bindIp == "" {
+ *v.bindIp = *v.ip
+ }
if *v.publicPort == 0 {
*v.publicPort = *v.port
@@ -364,11 +368,18 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}
httpDown := httpdown.HTTP{
- KillTimeout: 5 * time.Minute,
- StopTimeout: 5 * time.Minute,
+ KillTimeout: time.Minute,
+ StopTimeout: 30 * time.Second,
CertFile: certFile,
KeyFile: keyFile}
- clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ httpS := &http.Server{Handler: handler}
+
+ if viper.GetString("https.volume.ca") != "" {
+ clientCertFile := viper.GetString("https.volume.ca")
+ httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile)
+ }
+
+ clusterHttpServer := httpDown.Serve(httpS, listener)
go func() {
if e := clusterHttpServer.Wait(); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e)
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 319302175..689bf3c30 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -43,7 +43,7 @@ func init() {
webDavStandaloneOptions.tlsPrivateKey = cmdWebDav.Flag.String("key.file", "", "path to the TLS private key file")
webDavStandaloneOptions.tlsCertificate = cmdWebDav.Flag.String("cert.file", "", "path to the TLS certificate file")
webDavStandaloneOptions.cacheDir = cmdWebDav.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks")
- webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 1000, "local cache capacity in MB")
+ webDavStandaloneOptions.cacheSizeMB = cmdWebDav.Flag.Int64("cacheCapacityMB", 0, "local cache capacity in MB")
}
var cmdWebDav = &Command{