diff options
| author | guosj <515878133@qq.com> | 2022-04-19 09:26:06 +0800 |
|---|---|---|
| committer | guosj <515878133@qq.com> | 2022-04-19 09:26:06 +0800 |
| commit | 94c702402e879843792acc4be2cf01198268f250 (patch) | |
| tree | 593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/command | |
| parent | 5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff) | |
| parent | 82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (diff) | |
| download | seaweedfs-94c702402e879843792acc4be2cf01198268f250.tar.xz seaweedfs-94c702402e879843792acc4be2cf01198268f250.zip | |
Merge branch 'chrislusf-master'
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/benchmark.go | 2 | ||||
| -rw-r--r-- | weed/command/filer.go | 37 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 5 | ||||
| -rw-r--r-- | weed/command/iam.go | 2 | ||||
| -rw-r--r-- | weed/command/master.go | 60 | ||||
| -rw-r--r-- | weed/command/master_follower.go | 2 | ||||
| -rw-r--r-- | weed/command/mount.go | 2 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 24 | ||||
| -rw-r--r-- | weed/command/s3.go | 41 | ||||
| -rw-r--r-- | weed/command/server.go | 3 |
10 files changed, 121 insertions, 57 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index af5919adf..7091463cc 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses()) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/command/filer.go b/weed/command/filer.go index 4f8fd947a..4dbc04a0c 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -5,6 +5,7 @@ import ( "net" "net/http" "os" + "runtime" "time" "google.golang.org/grpc/reflection" @@ -29,7 +30,7 @@ var ( ) type FilerOptions struct { - masters []pb.ServerAddress + masters map[string]pb.ServerAddress mastersString *string ip *string bindIp *string @@ -89,6 +90,7 @@ func init() { filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file") filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") + filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") // start webdav on filer filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway") @@ -171,7 +173,7 @@ func runFiler(cmd *Command, args []string) bool { }() } - f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses() + f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap() f.startFiler() @@ -247,18 +249,6 @@ func (fo *FilerOptions) startFiler() { glog.Fatalf("Filer listener error: %v", e) } - // start on local unix socket - if *fo.localSocket == "" { - *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port) - if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) { - glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error()) - } - } - filerSocketListener, err := net.Listen("unix", *fo.localSocket) - if err != nil { - glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err) - } - // starting grpc server grpcPort := *fo.portGrpc grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0) @@ -274,9 +264,22 @@ func (fo *FilerOptions) startFiler() { go grpcS.Serve(grpcL) httpS := &http.Server{Handler: defaultMux} - go func() { - httpS.Serve(filerSocketListener) - }() + if runtime.GOOS != "windows" { + if *fo.localSocket == "" { + *fo.localSocket = fmt.Sprintf("/tmp/seaweefs-filer-%d.sock", *fo.port) + if err := os.Remove(*fo.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *fo.localSocket, err.Error()) + } + } + go func() { + // start on local unix socket + filerSocketListener, err := net.Listen("unix", *fo.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *fo.localSocket, err) + } + httpS.Serve(filerSocketListener) + }() + } if filerLocalListener != nil { go func() { if err := httpS.Serve(filerLocalListener); err != nil { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 37ce2aa73..e3d3b97bc 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -267,7 +267,10 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl return nil } key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) - return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + if !dataSink.IsIncremental() { + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + return nil } // handle new entries diff --git a/weed/command/iam.go b/weed/command/iam.go index 88b17b1a2..968d23095 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -67,7 +67,7 @@ func (iamopt *IamOptions) startIamServer() bool { } } - masters := pb.ServerAddresses(*iamopt.masters).ToAddresses() + masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap() router := mux.NewRouter().SkipClean(true) _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ Masters: masters, diff --git a/weed/command/master.go b/weed/command/master.go index 9e45c5037..9587df055 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,9 +1,9 @@ package command import ( + "golang.org/x/exp/slices" "net/http" "os" - "sort" "strings" "time" @@ -48,6 +48,8 @@ type MasterOptions struct { metricsHttpPort *int heartbeatInterval *time.Duration electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool } func init() { @@ -71,6 +73,8 @@ func init() { m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") + m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") + m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") } var cmdMaster = &Command{ @@ -132,8 +136,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers) + masterPeers := make(map[string]pb.ServerAddress) + for _, peer := range peers { + masterPeers[string(peer)] = peer + } + r := mux.NewRouter() - ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) + ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers) listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port) glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress) masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0) @@ -144,20 +153,32 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { // start raftServer raftServerOption := &weed_server.RaftServerOption{ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), - Peers: peers, + Peers: masterPeers, ServerAddr: myMasterAddress, DataDir: util.ResolvePath(*masterOption.metaFolder), Topo: ms.Topo, RaftResumeState: *masterOption.raftResumeState, HeartbeatInterval: *masterOption.heartbeatInterval, ElectionTimeout: *masterOption.electionTimeout, + RaftBootstrap: *m.raftBootstrap, } - raftServer, err := weed_server.NewRaftServer(raftServerOption) - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + var raftServer *weed_server.RaftServer + var err error + if *m.raftHashicorp { + if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil { + glog.Fatalf("NewHashicorpRaftServer: %s", err) + } + } else { + raftServer, err = weed_server.NewRaftServer(raftServerOption) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + if *m.raftHashicorp { + r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") + } // starting grpc server grpcPort := *masterOption.portGrpc grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0) @@ -166,7 +187,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) + if *m.raftHashicorp { + raftServer.TransportManager.Register(grpcS) + } else { + protobuf.RegisterRaftServer(grpcS, raftServer) + } reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) if grpcLocalL != nil { @@ -174,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } go grpcS.Serve(grpcL) - go func() { - time.Sleep(1500 * time.Millisecond) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + timeSleep := 1500 * time.Millisecond + if !*m.raftHashicorp { + go func() { + time.Sleep(timeSleep) + if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { + if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() + } } - } - }() + }() + } go ms.MasterClient.KeepConnectedToMaster() @@ -246,8 +274,8 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin } func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { - sort.Slice(peers, func(i, j int) bool { - return strings.Compare(string(peers[i]), string(peers[j])) < 0 + slices.SortFunc(peers, func(a, b pb.ServerAddress) bool { + return strings.Compare(string(a), string(b)) < 0 }) if len(peers) <= 0 { return true diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index f182d7ce4..ec7d2758f 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -83,7 +83,7 @@ func runMasterFollower(cmd *Command, args []string) bool { func startMasterFollower(masterOptions MasterOptions) { // collect settings from main masters - masters := pb.ServerAddresses(*mf.peers).ToAddresses() + masters := pb.ServerAddresses(*mf.peers).ToAddressMap() var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") diff --git a/weed/command/mount.go b/weed/command/mount.go index 428e073f2..2569bc3dc 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -29,6 +29,7 @@ type MountOptions struct { readOnly *bool debug *bool debugPort *int + localSocket *string } var ( @@ -63,6 +64,7 @@ func init() { mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only") mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") + mountOptions.localSocket = cmdMount.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-<mount_dir_hash>.sock") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index d865e053f..1d929dc96 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -12,9 +12,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/mount/unmount" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/hanwen/go-fuse/v2/fuse" + "google.golang.org/grpc/reflection" + "net" "net/http" "os" "os/user" @@ -98,6 +101,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { unmount.Unmount(dir) + // start on local unix socket + if *option.localSocket == "" { + mountDirHash := util.HashToInt32([]byte(dir)) + if mountDirHash < 0 { + mountDirHash = -mountDirHash + } + *option.localSocket = fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash) + } + if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error()) + } + montSocketListener, err := net.Listen("unix", *option.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err) + } + // detect mount folder mode if *option.dirAutoCreate { os.MkdirAll(dir, os.FileMode(0777)&^umask) @@ -229,6 +248,11 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { unmount.Unmount(dir) }) + grpcS := pb.NewGrpcServer() + mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem) + reflection.Register(grpcS) + go grpcS.Serve(montSocketListener) + seaweedFileSystem.StartBackgroundTasks() fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) diff --git a/weed/command/s3.go b/weed/command/s3.go index 467da73fd..c28f3016e 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -24,17 +24,18 @@ var ( ) type S3Options struct { - filer *string - bindIp *string - port *int - config *string - domainName *string - tlsPrivateKey *string - tlsCertificate *string - metricsHttpPort *int - allowEmptyFolder *bool - auditLogConfig *string - localFilerSocket *string + filer *string + bindIp *string + port *int + config *string + domainName *string + tlsPrivateKey *string + tlsCertificate *string + metricsHttpPort *int + allowEmptyFolder *bool + allowDeleteBucketNotEmpty *bool + auditLogConfig *string + localFilerSocket *string } func init() { @@ -49,6 +50,7 @@ func init() { s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders") + s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") } var cmdS3 = &Command{ @@ -178,14 +180,15 @@ func (s3opt *S3Options) startS3Server() bool { router := mux.NewRouter().SkipClean(true) _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: filerAddress, - Port: *s3opt.port, - Config: *s3opt.config, - DomainName: *s3opt.domainName, - BucketsPath: filerBucketsPath, - GrpcDialOption: grpcDialOption, - AllowEmptyFolder: *s3opt.allowEmptyFolder, - LocalFilerSocket: s3opt.localFilerSocket, + Filer: filerAddress, + Port: *s3opt.port, + Config: *s3opt.config, + DomainName: *s3opt.domainName, + BucketsPath: filerBucketsPath, + GrpcDialOption: grpcDialOption, + AllowEmptyFolder: *s3opt.allowEmptyFolder, + AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty, + LocalFilerSocket: s3opt.localFilerSocket, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/server.go b/weed/command/server.go index a1b495c5f..e3aec67d1 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -138,6 +138,7 @@ func init() { s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") s3Options.auditLogConfig = cmdServer.Flag.String("s3.auditLogConfig", "", "path to the audit log config file") s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders") + s3Options.allowDeleteBucketNotEmpty = cmdServer.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") iamOptions.port = cmdServer.Flag.Int("iam.port", 8111, "iam server http listen port") @@ -191,7 +192,7 @@ func runServer(cmd *Command, args []string) bool { // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses() + filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap() filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp s3Options.bindIp = serverBindIp |
