diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-07-16 11:13:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-07-16 11:13:23 +0800 |
| commit | d19bbee98d89ec6cd603572bd9c5d55749610e61 (patch) | |
| tree | 8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/topology/topology.go | |
| parent | 01060c992591f412b0d5e180bde29991747a9462 (diff) | |
| parent | 5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff) | |
| download | seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip | |
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 96 |
1 files changed, 84 insertions, 12 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 4242bfa05..aa01190c9 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -2,20 +2,25 @@ package topology import ( "errors" + "fmt" "math/rand" + "sync" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) type Topology struct { NodeImpl - collectionMap *util.ConcurrentReadMap + collectionMap *util.ConcurrentReadMap + ecShardMap map[needle.VolumeId]*EcShardLocations + ecShardMapLock sync.RWMutex pulse int64 @@ -37,6 +42,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.NodeImpl.value = t t.children = make(map[NodeId]Node) t.collectionMap = util.NewConcurrentReadMap() + t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -50,8 +56,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls } func (t *Topology) IsLeader() bool { - if leader, e := t.Leader(); e == nil { - return leader == t.RaftServer.Name() + if t.RaftServer != nil { + return t.RaftServer.State() == raft.Leader } return false } @@ -72,7 +78,7 @@ func (t *Topology) Leader() (string, error) { return l, nil } -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { +func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { //maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { @@ -85,14 +91,24 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return c.(*Collection).Lookup(vid) } } + + if locations, found := t.LookupEcShards(vid); found { + for _, loc := range locations.Locations { + dataNodes = append(dataNodes, loc...) + } + return dataNodes + } + return nil } -func (t *Topology) NextVolumeId() storage.VolumeId { +func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) - return next + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + return next, nil } func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { @@ -102,19 +118,43 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) - if err != nil || datanodes.Length() == 0 { - return "", 0, nil, errors.New("No writable volumes available!") + if err != nil { + return "", 0, nil, fmt.Errorf("failed to find writable volumes for collectio:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) + } + if datanodes.Length() == 0 { + return "", 0, nil, fmt.Errorf("no writable volumes available for for collectio:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout { return t.collectionMap.Get(collectionName, func() interface{} { return NewCollection(collectionName, t.volumeSizeLimit) }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } +func (t *Topology) ListCollections(includeNormalVolumes, includeEcVolumes bool) (ret []string) { + + mapOfCollections := make(map[string]bool) + for _, c := range t.collectionMap.Items() { + mapOfCollections[c.(*Collection).Name] = true + } + + if includeEcVolumes { + t.ecShardMapLock.RLock() + for _, ecVolumeLocation := range t.ecShardMap { + mapOfCollections[ecVolumeLocation.Collection] = true + } + t.ecShardMapLock.RUnlock() + } + + for k, _ := range mapOfCollections { + ret = append(ret, k) + } + return ret +} + func (t *Topology) FindCollection(collectionName string) (*Collection, bool) { c, hasCollection := t.collectionMap.Find(collectionName) if !hasCollection { @@ -152,6 +192,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { } func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) { + // convert into in memory struct storage.VolumeInfo var volumeInfos []storage.VolumeInfo for _, v := range volumes { if vi, err := storage.NewVolumeInfo(v); err == nil { @@ -160,8 +201,9 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati glog.V(0).Infof("Fail to convert joined volume information: %v", err) } } + // find out the delta volumes newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { + for _, v := range newVolumes { t.RegisterVolumeLayout(v, dn) } for _, v := range deletedVolumes { @@ -169,3 +211,33 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati } return } + +func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolumes []*master_pb.VolumeShortInformationMessage, dn *DataNode) { + var newVis, oldVis []storage.VolumeInfo + for _, v := range newVolumes { + vi, err := storage.NewVolumeInfoFromShort(v) + if err != nil { + glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err) + continue + } + newVis = append(newVis, vi) + } + for _, v := range deletedVolumes { + vi, err := storage.NewVolumeInfoFromShort(v) + if err != nil { + glog.V(0).Infof("NewVolumeInfoFromShort %v: %v", v, err) + continue + } + oldVis = append(oldVis, vi) + } + dn.DeltaUpdateVolumes(newVis, oldVis) + + for _, vi := range newVis { + t.RegisterVolumeLayout(vi, dn) + } + for _, vi := range oldVis { + t.UnRegisterVolumeLayout(vi, dn) + } + + return +} |
