aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology.go
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/topology/topology.go
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/topology/topology.go')
-rw-r--r--weed/topology/topology.go56
1 files changed, 45 insertions, 11 deletions
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 4cbe22a42..631c1fa29 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -1,14 +1,18 @@
package topology
import (
+ "encoding/json"
"errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -39,7 +43,10 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
+ RaftServer raft.Server
+ HashicorpRaft *hashicorpRaft.Raft
+ UuidAccessLock sync.RWMutex
+ UuidMap map[string][]string
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -71,19 +78,25 @@ func (t *Topology) IsLeader() bool {
return true
}
if leader, err := t.Leader(); err == nil {
- if t.RaftServer.Name() == leader {
+ if pb.ServerAddress(t.RaftServer.Name()) == leader {
return true
}
}
+ } else if t.HashicorpRaft != nil {
+ if t.HashicorpRaft.State() == hashicorpRaft.Leader {
+ return true
+ }
}
return false
}
-func (t *Topology) Leader() (string, error) {
- l := ""
+func (t *Topology) Leader() (pb.ServerAddress, error) {
+ var l pb.ServerAddress
for count := 0; count < 3; count++ {
if t.RaftServer != nil {
- l = t.RaftServer.Leader()
+ l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
} else {
return "", errors.New("Raft Server not ready yet!")
}
@@ -123,8 +136,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
- return 0, err
+ if t.RaftServer != nil {
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ } else if t.HashicorpRaft != nil {
+ b, err := json.Marshal(NewMaxVolumeIdCommand(next))
+ if err != nil {
+ return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
+ }
+ if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
+ return 0, future.Error()
+ }
}
return next, nil
}
@@ -136,7 +159,7 @@ func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
return active > 0
}
-func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
+func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
@@ -145,7 +168,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
fileId := t.Sequence.NextFileId(count)
- return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+ return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
@@ -205,7 +228,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl.EnsureCorrectWritables(&v)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- glog.Infof("removing volume info: %+v", v)
+ glog.Infof("removing volume info: %+v from %v", v, dn.id)
diskType := types.ToDiskType(v.DiskType)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
volumeLayout.UnRegisterVolume(&v, dn)
@@ -282,3 +305,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume
return
}
+
+func (t *Topology) DataNodeRegistration(dcName, rackName string, dn *DataNode) {
+ if dn.Parent() != nil {
+ return
+ }
+ // registration to topo
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ rack.LinkChildNode(dn)
+ glog.Infof("[%s] reLink To topo ", dn.Id())
+}