diff options
| -rw-r--r-- | docker/Makefile | 3 | ||||
| -rw-r--r-- | docker/compose/local-hashicorp-raft-compose.yml | 89 | ||||
| -rw-r--r-- | go.mod | 11 | ||||
| -rw-r--r-- | weed/command/master.go | 45 | ||||
| -rw-r--r-- | weed/server/master_server.go | 42 | ||||
| -rw-r--r-- | weed/server/raft_hashicorp.go | 146 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 64 | ||||
| -rw-r--r-- | weed/server/raft_server_handlers.go | 8 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 20 | ||||
| -rw-r--r-- | weed/topology/topology.go | 25 |
10 files changed, 419 insertions, 34 deletions
diff --git a/docker/Makefile b/docker/Makefile index 446bb5b47..76cdf75c8 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -55,6 +55,9 @@ cluster: build 2clusters: build docker-compose -f compose/local-clusters-compose.yml -p seaweedfs up +hashicorp_raft: build + docker-compose -f compose/local-hashicorp-raft-compose.yml -p seaweedfs up + s3tests: build s3tests_build docker-compose -f compose/local-s3tests-compose.yml -p seaweedfs up diff --git a/docker/compose/local-hashicorp-raft-compose.yml b/docker/compose/local-hashicorp-raft-compose.yml new file mode 100644 index 000000000..69e80a446 --- /dev/null +++ b/docker/compose/local-hashicorp-raft-compose.yml @@ -0,0 +1,89 @@ +version: '2' + +services: + master0: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -ip=master0 -port=9333 -peers=master1:9334,master2:9335 -mdir=/data" + volumes: + - ./master/0:/data + environment: + WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 + WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 + master1: + image: chrislusf/seaweedfs:local + ports: + - 9334:9334 + - 19334:19334 + command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -ip=master1 -port=9334 -peers=master0:9333,master2:9335 -mdir=/data" + volumes: + - ./master/1:/data + environment: + WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 + WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 + master2: + image: chrislusf/seaweedfs:local + ports: + - 9335:9335 + - 19335:19335 + command: "-v=4 master -volumeSizeLimitMB 100 -resumeState=false -raftHashicorp=true -ip=master2 -port=9335 -peers=master0:9333,master1:9334 -mdir=/data" + volumes: + - ./master/2:/data + environment: + WEED_MASTER_VOLUME_GROWTH_COPY_1: 1 + WEED_MASTER_VOLUME_GROWTH_COPY_2: 2 + WEED_MASTER_VOLUME_GROWTH_COPY_OTHER: 1 + volume1: + image: chrislusf/seaweedfs:local + ports: + - 8080:8080 + - 18080:18080 + command: 'volume -dataCenter=dc1 -rack=v1 -mserver="master0:9333,master1:9334,master2:9335" -port=8080 -ip=volume1 -publicUrl=localhost:8080 -preStopSeconds=1' + depends_on: + - master0 + - master1 + volume2: + image: chrislusf/seaweedfs:local + ports: + - 8082:8082 + - 18082:18082 + command: 'volume -dataCenter=dc2 -rack=v2 -mserver="master0:9333,master1:9334,master2:9335" -port=8082 -ip=volume2 -publicUrl=localhost:8082 -preStopSeconds=1' + depends_on: + - master0 + - master1 + volume3: + image: chrislusf/seaweedfs:local + ports: + - 8083:8083 + - 18083:18083 + command: 'volume -dataCenter=dc3 -rack=v3 -mserver="master0:9333,master1:9334,master2:9335" -port=8083 -ip=volume3 -publicUrl=localhost:8083 -preStopSeconds=1' + depends_on: + - master0 + - master1 + filer: + image: chrislusf/seaweedfs:local + ports: + - 8888:8888 + - 18888:18888 + - 8111:8111 + command: 'filer -defaultReplicaPlacement=100 -iam -master="master0:9333,master1:9334,master2:9335"' + depends_on: + - master0 + - master1 + - volume1 + - volume2 + s3: + image: chrislusf/seaweedfs:local + ports: + - 8333:8333 + command: '-v=9 s3 -ip.bind="s3" -filer="filer:8888"' + depends_on: + - master0 + - master1 + - volume1 + - volume2 + - filer
\ No newline at end of file @@ -155,6 +155,8 @@ require ( require ( cloud.google.com/go/compute v1.5.0 // indirect cloud.google.com/go/iam v0.3.0 // indirect + github.com/Jille/raft-grpc-transport v1.2.0 // indirect + github.com/armon/go-metrics v0.3.10 // indirect github.com/aws/aws-sdk-go-v2 v1.16.2 // indirect github.com/aws/aws-sdk-go-v2/config v1.15.3 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.11.2 // indirect @@ -168,16 +170,25 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.11.3 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.3 // indirect github.com/aws/smithy-go v1.11.2 // indirect + github.com/boltdb/bolt v1.3.1 // indirect github.com/d4l3k/messagediff v1.2.1 // indirect + github.com/fatih/color v1.13.0 // indirect github.com/fclairamb/go-log v0.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 // indirect + github.com/hashicorp/go-hclog v1.0.0 // indirect + github.com/hashicorp/go-immutable-radix v1.3.1 // indirect + github.com/hashicorp/go-msgpack v1.1.5 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/raft v1.3.6 // indirect + github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/goidentity/v6 v6.0.1 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.0.6 // indirect + github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-runewidth v0.0.7 // indirect github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d // indirect diff --git a/weed/command/master.go b/weed/command/master.go index 3d69f4216..37cfaf252 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -48,6 +48,8 @@ type MasterOptions struct { metricsHttpPort *int heartbeatInterval *time.Duration electionTimeout *time.Duration + raftHashicorp *bool + raftBootstrap *bool } func init() { @@ -71,6 +73,8 @@ func init() { m.raftResumeState = cmdMaster.Flag.Bool("resumeState", false, "resume previous state on start master server") m.heartbeatInterval = cmdMaster.Flag.Duration("heartbeatInterval", 300*time.Millisecond, "heartbeat interval of master servers, and will be randomly multiplied by [1, 1.25)") m.electionTimeout = cmdMaster.Flag.Duration("electionTimeout", 10*time.Second, "election timeout of master servers") + m.raftHashicorp = cmdMaster.Flag.Bool("raftHashicorp", false, "use hashicorp raft") + m.raftBootstrap = cmdMaster.Flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") } var cmdMaster = &Command{ @@ -156,13 +160,25 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { RaftResumeState: *masterOption.raftResumeState, HeartbeatInterval: *masterOption.heartbeatInterval, ElectionTimeout: *masterOption.electionTimeout, + RaftBootstrap: *m.raftBootstrap, } - raftServer, err := weed_server.NewRaftServer(raftServerOption) - if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + var raftServer *weed_server.RaftServer + var err error + if *m.raftHashicorp { + if raftServer, err = weed_server.NewHashicorpRaftServer(raftServerOption); err != nil { + glog.Fatalf("NewHashicorpRaftServer: %s", err) + } + } else { + raftServer, err = weed_server.NewRaftServer(raftServerOption) + if raftServer == nil { + glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + if *m.raftHashicorp { + r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") + } // starting grpc server grpcPort := *masterOption.portGrpc grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOption.ipBind, grpcPort, 0) @@ -171,7 +187,11 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) - protobuf.RegisterRaftServer(grpcS, raftServer) + if *m.raftHashicorp { + raftServer.TransportManager.Register(grpcS) + } else { + protobuf.RegisterRaftServer(grpcS, raftServer) + } reflection.Register(grpcS) glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOption.ipBind, grpcPort) if grpcLocalL != nil { @@ -179,14 +199,17 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } go grpcS.Serve(grpcL) - go func() { - time.Sleep(1500 * time.Millisecond) - if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { - if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { - raftServer.DoJoinCommand() + timeSleep := 1500 * time.Millisecond + if !*m.raftHashicorp { + go func() { + time.Sleep(timeSleep) + if ms.Topo.RaftServer.Leader() == "" && ms.Topo.RaftServer.IsLogEmpty() && isTheFirstOne(myMasterAddress, peers) { + if ms.MasterClient.FindLeaderFromOtherPeers(myMasterAddress) == "" { + raftServer.DoJoinCommand() + } } - } - }() + }() + } go ms.MasterClient.KeepConnectedToMaster() diff --git a/weed/server/master_server.go b/weed/server/master_server.go index b63e3a418..37e7f245c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -160,19 +160,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) - stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) + var raftServerName string + if raftServer.raftServer != nil { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + raftServerName = ms.Topo.RaftServer.Name() + } else if raftServer.RaftHashicorp != nil { + ms.Topo.HashicorpRaft = raftServer.RaftHashicorp + leaderCh := raftServer.RaftHashicorp.LeaderCh() + prevLeader := ms.Topo.HashicorpRaft.Leader() + go func() { + for { + select { + case isLeader := <-leaderCh: + leader := ms.Topo.HashicorpRaft.Leader() + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + prevLeader = leader + } + } + }() + raftServerName = ms.Topo.HashicorpRaft.String() + } if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } } } diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go new file mode 100644 index 000000000..e19839922 --- /dev/null +++ b/weed/server/raft_hashicorp.go @@ -0,0 +1,146 @@ +package weed_server + +// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28 +// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 + +import ( + "fmt" + transport "github.com/Jille/raft-grpc-transport" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/hashicorp/raft" + boltdb "github.com/hashicorp/raft-boltdb" + "google.golang.org/grpc" + "math/rand" + "os" + "path/filepath" + "time" +) + +func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { + for _, peer := range s.peers { + cfg.Servers = append(cfg.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(peer.String()), + Address: raft.ServerAddress(peer.ToGrpcAddress()), + }) + } + return cfg +} + +func (s *RaftServer) UpdatePeers() { + for { + select { + case isLeader := <-s.RaftHashicorp.LeaderCh(): + if isLeader { + peerLeader := s.serverAddr.String() + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + if peer.String() == peerLeader || existsPeerName[peer.String()] { + continue + } + glog.V(0).Infof("adding new peer: %s", peer.String()) + s.RaftHashicorp.AddVoter( + raft.ServerID(peer.String()), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer, _ := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } + } + break + } + } +} + +func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { + s := &RaftServer{ + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, + } + + c := raft.DefaultConfig() + c.LocalID = raft.ServerID(s.serverAddr.String()) // TODO maybee the IP:port address will change + c.NoSnapshotRestoreOnStart = option.RaftResumeState + c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + c.ElectionTimeout = option.ElectionTimeout + if c.LeaderLeaseTimeout > c.HeartbeatTimeout { + c.LeaderLeaseTimeout = c.HeartbeatTimeout + } + if glog.V(4) { + c.LogLevel = "Debug" + } else if glog.V(2) { + c.LogLevel = "Info" + } else if glog.V(1) { + c.LogLevel = "Warn" + } else if glog.V(0) { + c.LogLevel = "Error" + } + + baseDir := s.dataDir + + ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "logs.dat")) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) + } + + sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, "stable.dat")) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) + } + + fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) + if err != nil { + return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) + } + + s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) + + stateMachine := StateMachine{topo: option.Topo} + s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) + if err != nil { + return nil, fmt.Errorf("raft.NewRaft: %v", err) + } + if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { + cfg := s.AddPeersConfiguration() + glog.V(0).Infoln("Bootstrapping new cluster %+v", cfg) + f := s.RaftHashicorp.BootstrapCluster(cfg) + if err := f.Error(); err != nil { + return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) + } + } else { + go s.UpdatePeers() + } + + ticker := time.NewTicker(c.HeartbeatTimeout * 10) + if glog.V(4) { + go func() { + for { + select { + case <-ticker.C: + cfuture := s.RaftHashicorp.GetConfiguration() + if err = cfuture.Error(); err != nil { + glog.Fatalf("error getting config: %s", err) + } + configuration := cfuture.Configuration() + glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers) + } + } + }() + } + + return s, nil +} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index d559cb691..8c372f0cc 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,6 +2,9 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" @@ -12,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" @@ -26,14 +30,17 @@ type RaftServerOption struct { RaftResumeState bool HeartbeatInterval time.Duration ElectionTimeout time.Duration + RaftBootstrap bool } type RaftServer struct { - peers map[string]pb.ServerAddress // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr pb.ServerAddress - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + RaftHashicorp *hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -42,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error { return nil } +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -132,12 +171,17 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { } func (s *RaftServer) Peers() (members []string) { - peers := s.raftServer.Peers() - - for _, p := range peers { - members = append(members, p.Name) + if s.raftServer != nil { + peers := s.raftServer.Peers() + for _, p := range peers { + members = append(members, p.Name) + } + } else if s.RaftHashicorp != nil { + cfg := s.RaftHashicorp.GetConfiguration() + for _, p := range cfg.Configuration().Servers { + members = append(members, string(p.ID)) + } } - return } diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 7e58f1e92..cc3e6e37f 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { } writeJsonQuiet(w, r, http.StatusOK, ret) } + +func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) { + if s.RaftHashicorp == nil { + writeJsonQuiet(w, r, http.StatusNotFound, nil) + return + } + writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats()) +} diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 152691ccb..1bcc6b449 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -1,9 +1,12 @@ package topology import ( + "encoding/json" + "fmt" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + hashicorpRaft "github.com/hashicorp/raft" ) type MaxVolumeIdCommand struct { @@ -20,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string { return "MaxVolumeId" } +// deprecatedCommandApply represents the old interface to apply a command to the server. func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { topo := server.Context().(*Topology) before := topo.GetMaxVolumeId() @@ -29,3 +33,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { return nil, nil } + +func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error { + b, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("marshal: %v", err) + } + _, err = sink.Write(b) + if err != nil { + sink.Cancel() + return fmt.Errorf("sink.Write(): %v", err) + } + return sink.Close() +} + +func (s *MaxVolumeIdCommand) Release() { +} diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 207c89ad7..d636e8a9e 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -1,6 +1,7 @@ package topology import ( + "encoding/json" "errors" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" @@ -10,6 +11,7 @@ import ( "time" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -40,7 +42,8 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server + RaftServer raft.Server + HashicorpRaft *hashicorpRaft.Raft } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool { return true } } + } else if t.HashicorpRaft != nil { + if t.HashicorpRaft.State() == hashicorpRaft.Leader { + return true + } } return false } @@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) { for count := 0; count < 3; count++ { if t.RaftServer != nil { l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) } else { return "", errors.New("Raft Server not ready yet!") } @@ -124,8 +133,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { - return 0, err + if t.RaftServer != nil { + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + } else if t.HashicorpRaft != nil { + b, err := json.Marshal(NewMaxVolumeIdCommand(next)) + if err != nil { + return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err) + } + if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil { + return 0, future.Error() + } } return next, nil } |
