diff options
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 79 |
1 files changed, 54 insertions, 25 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index e6cb44727..08ebd24fd 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -3,8 +3,10 @@ package topology import ( "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" + "time" "github.com/chrislusf/raft" @@ -27,7 +29,8 @@ type Topology struct { pulse int64 - volumeSizeLimit uint64 + volumeSizeLimit uint64 + replicationAsMin bool Sequence sequence.Sequencer @@ -38,16 +41,18 @@ type Topology struct { RaftServer raft.Server } -func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology { +func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { t := &Topology{} t.id = NodeId(id) t.nodeType = "Topology" t.NodeImpl.value = t + t.diskUsages = newDiskUsages() t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit + t.replicationAsMin = replicationAsMin t.Sequence = seq @@ -60,29 +65,32 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls func (t *Topology) IsLeader() bool { if t.RaftServer != nil { - return t.RaftServer.State() == raft.Leader + if t.RaftServer.State() == raft.Leader { + return true + } } return false } func (t *Topology) Leader() (string, error) { l := "" - if t.RaftServer != nil { - l = t.RaftServer.Leader() - } else { - return "", errors.New("Raft Server not ready yet!") - } - - if l == "" { - // We are a single node cluster, we are the leader - return t.RaftServer.Name(), errors.New("Raft Server not initialized!") + for count := 0; count < 3; count++ { + if t.RaftServer != nil { + l = t.RaftServer.Leader() + } else { + return "", errors.New("Raft Server not ready yet!") + } + if l != "" { + break + } else { + time.Sleep(time.Duration(5+count) * time.Second) + } } - return l, nil } func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { - //maybe an issue if lots of collections? + // maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { if list := c.(*Collection).Lookup(vid); list != nil { @@ -115,12 +123,12 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) if err != nil { return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) } @@ -131,10 +139,10 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) + return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl, diskType) } func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { @@ -152,7 +160,7 @@ func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) t.ecShardMapLock.RUnlock() } - for k, _ := range mapOfCollections { + for k := range mapOfCollections { ret = append(ret, k) } return ret @@ -170,15 +178,30 @@ func (t *Topology) DeleteCollection(collectionName string) { t.collectionMap.Delete(collectionName) } +func (t *Topology) DeleteLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) { + collection, found := t.FindCollection(collectionName) + if !found { + return + } + collection.DeleteVolumeLayout(rp, ttl, diskType) + if len(collection.storageType2VolumeLayout.Items()) == 0 { + t.DeleteCollection(collectionName) + } +} + func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) + vl.RegisterVolume(&v, dn) + vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info:%+v", v) - volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + glog.Infof("removing volume info: %+v", v) + diskType := types.ToDiskType(v.DiskType) + volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) if volumeLayout.isEmpty() { - t.DeleteCollection(v.Collection) + t.DeleteLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) } } @@ -205,13 +228,19 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati } } // find out the delta volumes - newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) + var changedVolumes []storage.VolumeInfo + newVolumes, deletedVolumes, changedVolumes = dn.UpdateVolumes(volumeInfos) for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { t.UnRegisterVolumeLayout(v, dn) } + for _, v := range changedVolumes { + diskType := types.ToDiskType(v.DiskType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) + vl.EnsureCorrectWritables(&v) + } return } |
