aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-03-15 23:03:49 -0700
committerChris Lu <chris.lu@gmail.com>2014-03-15 23:03:49 -0700
commit41143b3b78a1021111edc28a27f76ae2529f99a8 (patch)
tree5508f149f59363f5bdab3c16a23c02d57d4e310d /go/topology
parentfb75fe852c5282cef436329bb0700d4d0e09f511 (diff)
downloadseaweedfs-41143b3b78a1021111edc28a27f76ae2529f99a8.tar.xz
seaweedfs-41143b3b78a1021111edc28a27f76ae2529f99a8.zip
toughen weedfs clustering, adding synchronizing max volume id among
peers in order to avoid the same volume id being assigned twice 1. moving raft.Server to topology 2. adding max volume id command for raft
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/cluster_commands.go31
-rw-r--r--go/topology/topology.go27
-rw-r--r--go/topology/topology_event_handling.go4
3 files changed, 57 insertions, 5 deletions
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go
new file mode 100644
index 000000000..dc0a40c8d
--- /dev/null
+++ b/go/topology/cluster_commands.go
@@ -0,0 +1,31 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+ "github.com/goraft/raft"
+)
+
+type MaxVolumeIdCommand struct {
+ MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+}
+
+func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+ return &MaxVolumeIdCommand{
+ MaxVolumeId: value,
+ }
+}
+
+func (c *MaxVolumeIdCommand) CommandName() string {
+ return "MaxVolumeId"
+}
+
+func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
+ topo := server.Context().(*Topology)
+ before := topo.GetMaxVolumeId()
+ topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
+
+ glog.V(0).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
+
+ return nil, nil
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index d72879035..055d273cd 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -5,6 +5,7 @@ import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
+ "github.com/goraft/raft"
"io/ioutil"
"math/rand"
)
@@ -12,8 +13,6 @@ import (
type Topology struct {
NodeImpl
- IsLeader bool
-
collectionMap map[string]*Collection
pulse int64
@@ -27,6 +26,8 @@ type Topology struct {
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
+
+ RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
@@ -50,6 +51,24 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
return t, err
}
+func (t *Topology) IsLeader() bool {
+ return t.RaftServer == nil || t.Leader() == t.RaftServer.Name()
+}
+
+func (t *Topology) Leader() string {
+ l := ""
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ }
+
+ if l == "" {
+ // We are a single node cluster, we are the leader
+ return t.RaftServer.Name()
+ }
+
+ return l
+}
+
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
@@ -79,7 +98,9 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
- return vid.Next()
+ next := vid.Next()
+ go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
+ return next
}
func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 710e7b2ae..7398ff9bf 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -10,7 +10,7 @@ import (
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func() {
for {
- if t.IsLeader {
+ if t.IsLeader() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
}
@@ -19,7 +19,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute)
- if t.IsLeader {
+ if t.IsLeader() {
for _ = range c {
t.Vacuum(garbageThreshold)
}