aboutsummaryrefslogtreecommitdiff
path: root/go/topology/topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology/topology.go')
-rw-r--r--go/topology/topology.go27
1 files changed, 24 insertions, 3 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d72879035..055d273cd 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
+ "github.com/goraft/raft"
"io/ioutil"
"math/rand"
)
@@ -12,8 +13,6 @@ import (
type Topology struct {
NodeImpl
- IsLeader bool
-
collectionMap map[string]*Collection
pulse int64
@@ -27,6 +26,8 @@ type Topology struct {
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
+
+ RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
@@ -50,6 +51,24 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
return t, err
}
+func (t *Topology) IsLeader() bool {
+ return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
+}
+
+func (t *Topology) Leader() string {
+ l := ""
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ }
+
+ if l == "" {
+ // We are a single node cluster, we are the leader
+ return t.RaftServer.Name()
+ }
+
+ return l
+}
+
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
@@ -79,7 +98,9 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
- return vid.Next()
+ next := vid.Next()
+ go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
+ return next
}
func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {