aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer.go')
-rw-r--r--weed/command/filer.go103
1 files changed, 77 insertions, 26 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index ddee0852c..c9f9a1956 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -2,10 +2,10 @@ package command
import (
"fmt"
+ "net"
"net/http"
"os"
- "strconv"
- "strings"
+ "runtime"
"time"
"google.golang.org/grpc/reflection"
@@ -14,7 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -30,11 +30,14 @@ var (
)
type FilerOptions struct {
- masters *string
+ masters map[string]pb.ServerAddress
+ mastersString *string
ip *string
bindIp *string
port *int
+ portGrpc *int
publicPort *int
+ filerGroup *string
collection *string
defaultReplicaPlacement *string
disableDirListing *bool
@@ -45,22 +48,25 @@ type FilerOptions struct {
enableNotification *bool
disableHttp *bool
cipher *bool
- peers *string
metricsHttpPort *int
saveToFilerLimit *int
defaultLevelDbDirectory *string
concurrentUploadLimitMB *int
debug *bool
debugPort *int
+ localSocket *string
+ showUIDirectoryDelete *bool
}
func init() {
cmdFiler.Run = runFiler // break init cycle
- f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
- f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to")
+ f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
+ f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
@@ -70,22 +76,26 @@ func init() {
f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
- f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
+ f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
+ f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port")
+ filerS3Options.portGrpc = cmdFiler.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port")
filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
- filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
+ filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
+ filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
+ filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
// start webdav on filer
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
@@ -96,10 +106,11 @@ func init() {
filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
- filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 1000, "local cache capacity in MB")
+ filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
// start iam on filer
filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service")
+ filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address")
filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
}
@@ -134,10 +145,12 @@ func runFiler(cmd *Command, args []string) bool {
go stats_collect.StartMetricsServer(*f.metricsHttpPort)
- filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port)
+ filerAddress := util.JoinHostPort(*f.ip, *f.port)
startDelay := time.Duration(2)
if *filerStartS3 {
filerS3Options.filer = &filerAddress
+ filerS3Options.bindIp = f.bindIp
+ filerS3Options.localFilerSocket = f.localSocket
go func() {
time.Sleep(startDelay * time.Second)
filerS3Options.startS3Server()
@@ -156,13 +169,15 @@ func runFiler(cmd *Command, args []string) bool {
if *filerStartIam {
filerIamOptions.filer = &filerAddress
- filerIamOptions.masters = f.masters
+ filerIamOptions.masters = f.mastersString
go func() {
time.Sleep(startDelay * time.Second)
filerIamOptions.startIamServer()
}()
}
+ f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
+
f.startFiler()
return true
@@ -176,16 +191,20 @@ func (fo *FilerOptions) startFiler() {
if *fo.publicPort != 0 {
publicVolumeMux = http.NewServeMux()
}
+ if *fo.portGrpc == 0 {
+ *fo.portGrpc = 10000 + *fo.port
+ }
+ if *fo.bindIp == "" {
+ *fo.bindIp = *fo.ip
+ }
defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
- var peers []string
- if *fo.peers != "" {
- peers = strings.Split(*fo.peers, ",")
- }
+ filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc)
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
- Masters: strings.Split(*fo.masters, ","),
+ Masters: fo.masters,
+ FilerGroup: *fo.filerGroup,
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
DisableDirListing: *fo.disableDirListing,
@@ -195,21 +214,20 @@ func (fo *FilerOptions) startFiler() {
Rack: *fo.rack,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
- Host: *fo.ip,
- Port: uint32(*fo.port),
+ Host: filerAddress,
Cipher: *fo.cipher,
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
- Filers: peers,
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
+ ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
}
if *fo.publicPort != 0 {
- publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort)
+ publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort)
glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, 0)
+ publicListener, localPublicListner, e := util.NewIpAndLocalListeners(*fo.bindIp, *fo.publicPort, 0)
if e != nil {
glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e)
}
@@ -218,11 +236,18 @@ func (fo *FilerOptions) startFiler() {
glog.Fatalf("Volume server fail to serve public: %v", e)
}
}()
+ if localPublicListner != nil {
+ go func() {
+ if e := http.Serve(localPublicListner, publicVolumeMux); e != nil {
+ glog.Errorf("Volume server fail to serve public: %v", e)
+ }
+ }()
+ }
}
glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
- filerListener, e := util.NewListener(
- *fo.bindIp+":"+strconv.Itoa(*fo.port),
+ filerListener, filerLocalListener, e := util.NewIpAndLocalListeners(
+ *fo.bindIp, *fo.port,
time.Duration(10)*time.Second,
)
if e != nil {
@@ -230,17 +255,43 @@ func (fo *FilerOptions) startFiler() {
}
// starting grpc server
- grpcPort := *fo.port + 10000
- grpcL, err := util.NewListener(*fo.bindIp+":"+strconv.Itoa(grpcPort), 0)
+ grpcPort := *fo.portGrpc
+ grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
+ if grpcLocalL != nil {
+ go grpcS.Serve(grpcLocalL)
+ }
go grpcS.Serve(grpcL)
httpS := &http.Server{Handler: defaultMux}
+ if runtime.GOOS != "windows" {
+ if *fo.localSocket == "" {
+ *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
+ }
+ if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
+ glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
+ }
+ go func() {
+ // start on local unix socket
+ filerSocketListener, err := net.Listen("unix", *fo.localSocket)
+ if err != nil {
+ glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
+ }
+ httpS.Serve(filerSocketListener)
+ }()
+ }
+ if filerLocalListener != nil {
+ go func() {
+ if err := httpS.Serve(filerLocalListener); err != nil {
+ glog.Errorf("Filer Fail to serve: %v", e)
+ }
+ }()
+ }
if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}