diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/topology/topology.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/topology/topology.go')
| -rw-r--r-- | weed/topology/topology.go | 56 |
1 files changed, 45 insertions, 11 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 4cbe22a42..631c1fa29 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -1,14 +1,18 @@ package topology import ( + "encoding/json" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "sync" "time" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -39,7 +43,10 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server + RaftServer raft.Server + HashicorpRaft *hashicorpRaft.Raft + UuidAccessLock sync.RWMutex + UuidMap map[string][]string } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -71,19 +78,25 @@ func (t *Topology) IsLeader() bool { return true } if leader, err := t.Leader(); err == nil { - if t.RaftServer.Name() == leader { + if pb.ServerAddress(t.RaftServer.Name()) == leader { return true } } + } else if t.HashicorpRaft != nil { + if t.HashicorpRaft.State() == hashicorpRaft.Leader { + return true + } } return false } -func (t *Topology) Leader() (string, error) { - l := "" +func (t *Topology) Leader() (pb.ServerAddress, error) { + var l pb.ServerAddress for count := 0; count < 3; count++ { if t.RaftServer != nil { - l = t.RaftServer.Leader() + l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) } else { return "", errors.New("Raft Server not ready yet!") } @@ -123,8 +136,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { - return 0, err + if t.RaftServer != nil { + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + } else if t.HashicorpRaft != nil { + b, err := json.Marshal(NewMaxVolumeIdCommand(next)) + if err != nil { + return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err) + } + if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil { + return 0, future.Error() + } } return next, nil } @@ -136,7 +159,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { return active > 0 } -func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { +func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) { vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option) if err != nil { return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err) @@ -145,7 +168,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String()) } fileId := t.Sequence.NextFileId(count) - return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil + return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil } func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout { @@ -205,7 +228,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info: %+v", v) + glog.Infof("removing volume info: %+v from %v", v, dn.id) diskType := types.ToDiskType(v.DiskType) volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) @@ -282,3 +305,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume return } + +func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) { + if dn.Parent() != nil { + return + } + // registration to topo + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + rack.LinkChildNode(dn) + glog.Infof("[%s] reLink To topo ", dn.Id()) +} |
