diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/cluster_commands.go | 31 | ||||
| -rw-r--r-- | go/topology/topology.go | 27 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 4 |
3 files changed, 57 insertions, 5 deletions
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go new file mode 100644 index 000000000..dc0a40c8d --- /dev/null +++ b/go/topology/cluster_commands.go @@ -0,0 +1,31 @@ +package topology + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/storage" + "github.com/goraft/raft" +) + +type MaxVolumeIdCommand struct { + MaxVolumeId storage.VolumeId `json:"maxVolumeId"` +} + +func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { + return &MaxVolumeIdCommand{ + MaxVolumeId: value, + } +} + +func (c *MaxVolumeIdCommand) CommandName() string { + return "MaxVolumeId" +} + +func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { + topo := server.Context().(*Topology) + before := topo.GetMaxVolumeId() + topo.UpAdjustMaxVolumeId(c.MaxVolumeId) + + glog.V(0).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId()) + + return nil, nil +} diff --git a/go/topology/topology.go b/go/topology/topology.go index d72879035..055d273cd 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -5,6 +5,7 @@ import ( "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "errors" + "github.com/goraft/raft" "io/ioutil" "math/rand" ) @@ -12,8 +13,6 @@ import ( type Topology struct { NodeImpl - IsLeader bool - collectionMap map[string]*Collection pulse int64 @@ -27,6 +26,8 @@ type Topology struct { chanFullVolumes chan storage.VolumeInfo configuration *Configuration + + RaftServer raft.Server } func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { @@ -50,6 +51,24 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL return t, err } +func (t *Topology) IsLeader() bool { + return t.RaftServer == nil || t.Leader() == t.RaftServer.Name() +} + +func (t *Topology) Leader() string { + l := "" + if t.RaftServer != nil { + l = t.RaftServer.Leader() + } + + if l == "" { + // We are a single node cluster, we are the leader + return t.RaftServer.Name() + } + + return l +} + func (t *Topology) loadConfiguration(configurationFile string) error { b, e := ioutil.ReadFile(configurationFile) if e == nil { @@ -79,7 +98,9 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { func (t *Topology) NextVolumeId() storage.VolumeId { vid := t.GetMaxVolumeId() - return vid.Next() + next := vid.Next() + go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) + return next } func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 710e7b2ae..7398ff9bf 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -10,7 +10,7 @@ import ( func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { go func() { for { - if t.IsLeader { + if t.IsLeader() { freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) } @@ -19,7 +19,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() go func(garbageThreshold string) { c := time.Tick(15 * time.Minute) - if t.IsLeader { + if t.IsLeader() { for _ = range c { t.Vacuum(garbageThreshold) } |
