aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/cluster_commands.go31
-rw-r--r--go/topology/topology.go27
-rw-r--r--go/topology/topology_event_handling.go4
3 files changed, 57 insertions, 5 deletions
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go
new file mode 100644
index 000000000..dc0a40c8d
--- /dev/null
+++ b/go/topology/cluster_commands.go
@@ -0,0 +1,31 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+ "github.com/goraft/raft"
+)
+
+type MaxVolumeIdCommand struct {
+ MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+}
+
+func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+ return &MaxVolumeIdCommand{
+ MaxVolumeId: value,
+ }
+}
+
+func (c *MaxVolumeIdCommand) CommandName() string {
+ return "MaxVolumeId"
+}
+
+func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
+ topo := server.Context().(*Topology)
+ before := topo.GetMaxVolumeId()
+ topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
+
+ glog.V(0).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
+
+ return nil, nil
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d72879035..055d273cd 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
+ "github.com/goraft/raft"
"io/ioutil"
"math/rand"
)
@@ -12,8 +13,6 @@ import (
type Topology struct {
NodeImpl
- IsLeader bool
-
collectionMap map[string]*Collection
pulse int64
@@ -27,6 +26,8 @@ type Topology struct {
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
+
+ RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
@@ -50,6 +51,24 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
return t, err
}
+func (t *Topology) IsLeader() bool {
+ return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
+}
+
+func (t *Topology) Leader() string {
+ l := ""
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ }
+
+ if l == "" {
+ // We are a single node cluster, we are the leader
+ return t.RaftServer.Name()
+ }
+
+ return l
+}
+
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
@@ -79,7 +98,9 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
- return vid.Next()
+ next := vid.Next()
+ go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
+ return next
}
func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 710e7b2ae..7398ff9bf 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -10,7 +10,7 @@ import (
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func() {
for {
- if t.IsLeader {
+ if t.IsLeader() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
}
@@ -19,7 +19,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute)
- if t.IsLeader {
+ if t.IsLeader() {
for _ = range c {
t.Vacuum(garbageThreshold)
}