aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorguosj <515878133@qq.com>2022-04-19 09:26:06 +0800
committerguosj <515878133@qq.com>2022-04-19 09:26:06 +0800
commit94c702402e879843792acc4be2cf01198268f250 (patch)
tree593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/command
parent5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff)
parent82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (diff)
downloadseaweedfs-94c702402e879843792acc4be2cf01198268f250.tar.xz
seaweedfs-94c702402e879843792acc4be2cf01198268f250.zip
Merge branch 'chrislusf-master'
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/filer.go37
-rw-r--r--weed/command/filer_sync.go5
-rw-r--r--weed/command/iam.go2
-rw-r--r--weed/command/master.go60
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go24
-rw-r--r--weed/command/s3.go41
-rw-r--r--weed/command/server.go3
10 files changed, 121 insertions, 57 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index af5919adf..7091463cc 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses())
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 4f8fd947a..4dbc04a0c 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -5,6 +5,7 @@ import (
"net"
"net/http"
"os"
+ "runtime"
"time"
"google.golang.org/grpc/reflection"
@@ -29,7 +30,7 @@ var (
)
type FilerOptions struct {
- masters []pb.ServerAddress
+ masters map[string]pb.ServerAddress
mastersString *string
ip *string
bindIp *string
@@ -89,6 +90,7 @@ func init() {
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
+ filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
// start webdav on filer
filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
@@ -171,7 +173,7 @@ func runFiler(cmd *Command, args []string) bool {
}()
}
- f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses()
+ f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
f.startFiler()
@@ -247,18 +249,6 @@ func (fo *FilerOptions) startFiler() {
glog.Fatalf("Filer listener error: %v", e)
}
- // start on local unix socket
- if *fo.localSocket == "" {
- *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
- if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
- glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
- }
- }
- filerSocketListener, err := net.Listen("unix", *fo.localSocket)
- if err != nil {
- glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
- }
-
// starting grpc server
grpcPort := *fo.portGrpc
grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0)
@@ -274,9 +264,22 @@ func (fo *FilerOptions) startFiler() {
go grpcS.Serve(grpcL)
httpS := &http.Server{Handler: defaultMux}
- go func() {
- httpS.Serve(filerSocketListener)
- }()
+ if runtime.GOOS != "windows" {
+ if *fo.localSocket == "" {
+ *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port)
+ if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) {
+ glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error())
+ }
+ }
+ go func() {
+ // start on local unix socket
+ filerSocketListener, err := net.Listen("unix", *fo.localSocket)
+ if err != nil {
+ glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err)
+ }
+ httpS.Serve(filerSocketListener)
+ }()
+ }
if filerLocalListener != nil {
go func() {
if err := httpS.Serve(filerLocalListener); err != nil {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 37ce2aa73..e3d3b97bc 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -267,7 +267,10 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
- return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ if !dataSink.IsIncremental() {
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ return nil
}
// handle new entries
diff --git a/weed/command/iam.go b/weed/command/iam.go
index 88b17b1a2..968d23095 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -67,7 +67,7 @@ func (iamopt *IamOptions) startIamServer() bool {
}
}
- masters := pb.ServerAddresses(*iamopt.masters).ToAddresses()
+ masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap()
router := mux.NewRouter().SkipClean(true)
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters,
diff --git a/weed/command/master.go b/weed/command/master.go
index 9e45c5037..9587df055 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,9 +1,9 @@
package command
import (
+ "golang.org/x/exp/slices"
"net/http"
"os"
- "sort"
"strings"
"time"
@@ -48,6 +48,8 @@ type MasterOptions struct {
metricsHttpPort *int
heartbeatInterval *time.Duration
electionTimeout *time.Duration
+ raftHashicorp *bool
+ raftBootstrap *bool
}
func init() {
@@ -71,6 +73,8 @@ func init() {
m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server")
m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)")
m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers")
+ m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft")
+ m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
}
var cmdMaster = &Command{
@@ -132,8 +136,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
+ masterPeers := make(map[string]pb.ServerAddress)
+ for _, peer := range peers {
+ masterPeers[string(peer)] = peer
+ }
+
r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers)
listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0)
@@ -144,20 +153,32 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
// start raftServer
raftServerOption := &weed_server.RaftServerOption{
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
- Peers: peers,
+ Peers: masterPeers,
ServerAddr: myMasterAddress,
DataDir: util.ResolvePath(*masterOption.metaFolder),
Topo: ms.Topo,
RaftResumeState: *masterOption.raftResumeState,
HeartbeatInterval: *masterOption.heartbeatInterval,
ElectionTimeout: *masterOption.electionTimeout,
+ RaftBootstrap: *m.raftBootstrap,
}
- raftServer, err := weed_server.NewRaftServer(raftServerOption)
- if raftServer == nil {
- glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ var raftServer *weed_server.RaftServer
+ var err error
+ if *m.raftHashicorp {
+ if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil {
+ glog.Fatalf("NewHashicorpRaftServer: %s", err)
+ }
+ } else {
+ raftServer, err = weed_server.NewRaftServer(raftServerOption)
+ if raftServer == nil {
+ glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
+ }
}
ms.SetRaftServer(raftServer)
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
+ if *m.raftHashicorp {
+ r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
+ }
// starting grpc server
grpcPort := *masterOption.portGrpc
grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0)
@@ -166,7 +187,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
- protobuf.RegisterRaftServer(grpcS, raftServer)
+ if *m.raftHashicorp {
+ raftServer.TransportManager.Register(grpcS)
+ } else {
+ protobuf.RegisterRaftServer(grpcS, raftServer)
+ }
reflection.Register(grpcS)
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort)
if grpcLocalL != nil {
@@ -174,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
go grpcS.Serve(grpcL)
- go func() {
- time.Sleep(1500 * time.Millisecond)
- if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
- if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
- raftServer.DoJoinCommand()
+ timeSleep := 1500 * time.Millisecond
+ if !*m.raftHashicorp {
+ go func() {
+ time.Sleep(timeSleep)
+ if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) {
+ if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" {
+ raftServer.DoJoinCommand()
+ }
}
- }
- }()
+ }()
+ }
go ms.MasterClient.KeepConnectedToMaster()
@@ -246,8 +274,8 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin
}
func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
- sort.Slice(peers, func(i, j int) bool {
- return strings.Compare(string(peers[i]), string(peers[j])) < 0
+ slices.SortFunc(peers, func(a, b pb.ServerAddress) bool {
+ return strings.Compare(string(a), string(b)) < 0
})
if len(peers) <= 0 {
return true
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index f182d7ce4..ec7d2758f 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -83,7 +83,7 @@ func runMasterFollower(cmd *Command, args []string) bool {
func startMasterFollower(masterOptions MasterOptions) {
// collect settings from main masters
- masters := pb.ServerAddresses(*mf.peers).ToAddresses()
+ masters := pb.ServerAddresses(*mf.peers).ToAddressMap()
var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 428e073f2..2569bc3dc 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -29,6 +29,7 @@ type MountOptions struct {
readOnly *bool
debug *bool
debugPort *int
+ localSocket *string
}
var (
@@ -63,6 +64,7 @@ func init() {
mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only")
mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging")
+ mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-<mount_dir_hash>.sock")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index d865e053f..1d929dc96 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -12,9 +12,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/mount/unmount"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/hanwen/go-fuse/v2/fuse"
+ "google.golang.org/grpc/reflection"
+ "net"
"net/http"
"os"
"os/user"
@@ -98,6 +101,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
unmount.Unmount(dir)
+ // start on local unix socket
+ if *option.localSocket == "" {
+ mountDirHash := util.HashToInt32([]byte(dir))
+ if mountDirHash < 0 {
+ mountDirHash = -mountDirHash
+ }
+ *option.localSocket = fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash)
+ }
+ if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
+ glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
+ }
+ montSocketListener, err := net.Listen("unix", *option.localSocket)
+ if err != nil {
+ glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
+ }
+
// detect mount folder mode
if *option.dirAutoCreate {
os.MkdirAll(dir, os.FileMode(0777)&^umask)
@@ -229,6 +248,11 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
unmount.Unmount(dir)
})
+ grpcS := pb.NewGrpcServer()
+ mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
+ reflection.Register(grpcS)
+ go grpcS.Serve(montSocketListener)
+
seaweedFileSystem.StartBackgroundTasks()
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 467da73fd..c28f3016e 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -24,17 +24,18 @@ var (
)
type S3Options struct {
- filer *string
- bindIp *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
- metricsHttpPort *int
- allowEmptyFolder *bool
- auditLogConfig *string
- localFilerSocket *string
+ filer *string
+ bindIp *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
+ allowEmptyFolder *bool
+ allowDeleteBucketNotEmpty *bool
+ auditLogConfig *string
+ localFilerSocket *string
}
func init() {
@@ -49,6 +50,7 @@ func init() {
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
+ s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
}
var cmdS3 = &Command{
@@ -178,14 +180,15 @@ func (s3opt *S3Options) startS3Server() bool {
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
- Filer: filerAddress,
- Port: *s3opt.port,
- Config: *s3opt.config,
- DomainName: *s3opt.domainName,
- BucketsPath: filerBucketsPath,
- GrpcDialOption: grpcDialOption,
- AllowEmptyFolder: *s3opt.allowEmptyFolder,
- LocalFilerSocket: s3opt.localFilerSocket,
+ Filer: filerAddress,
+ Port: *s3opt.port,
+ Config: *s3opt.config,
+ DomainName: *s3opt.domainName,
+ BucketsPath: filerBucketsPath,
+ GrpcDialOption: grpcDialOption,
+ AllowEmptyFolder: *s3opt.allowEmptyFolder,
+ AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
+ LocalFilerSocket: s3opt.localFilerSocket,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/server.go b/weed/command/server.go
index a1b495c5f..e3aec67d1 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -138,6 +138,7 @@ func init() {
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
+ s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port")
@@ -191,7 +192,7 @@ func runServer(cmd *Command, args []string) bool {
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses()
+ filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap()
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
s3Options.bindIp = serverBindIp