aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authorguosj <515878133@qq.com>2022-04-19 09:26:06 +0800
committerguosj <515878133@qq.com>2022-04-19 09:26:06 +0800
commit94c702402e879843792acc4be2cf01198268f250 (patch)
tree593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/topology
parent5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff)
parent82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (diff)
downloadseaweedfs-94c702402e879843792acc4be2cf01198268f250.tar.xz
seaweedfs-94c702402e879843792acc4be2cf01198268f250.zip
Merge branch 'chrislusf-master'
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/cluster_commands.go20
-rw-r--r--weed/topology/data_node_ec.go4
-rw-r--r--weed/topology/topology.go36
-rw-r--r--weed/topology/topology_event_handling.go6
-rw-r--r--weed/topology/topology_vacuum.go14
-rw-r--r--weed/topology/volume_layout.go7
6 files changed, 76 insertions, 11 deletions
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
index 152691ccb..1bcc6b449 100644
--- a/weed/topology/cluster_commands.go
+++ b/weed/topology/cluster_commands.go
@@ -1,9 +1,12 @@
package topology
import (
+ "encoding/json"
+ "fmt"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ hashicorpRaft "github.com/hashicorp/raft"
)
type MaxVolumeIdCommand struct {
@@ -20,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string {
return "MaxVolumeId"
}
+// deprecatedCommandApply represents the old interface to apply a command to the server.
func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
topo := server.Context().(*Topology)
before := topo.GetMaxVolumeId()
@@ -29,3 +33,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
return nil, nil
}
+
+func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error {
+ b, err := json.Marshal(s)
+ if err != nil {
+ return fmt.Errorf("marshal: %v", err)
+ }
+ _, err = sink.Write(b)
+ if err != nil {
+ sink.Cancel()
+ return fmt.Errorf("sink.Write(): %v", err)
+ }
+ return sink.Close()
+}
+
+func (s *MaxVolumeIdCommand) Release() {
+}
diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go
index 330b16b24..bf72fa9af 100644
--- a/weed/topology/data_node_ec.go
+++ b/weed/topology/data_node_ec.go
@@ -58,7 +58,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
}
for _, ecShards := range actualShards {
- if dn.hasEcShards(ecShards.VolumeId) {
+ if dn.HasEcShards(ecShards.VolumeId) {
continue
}
@@ -79,7 +79,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo)
return
}
-func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) {
+func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) {
dn.RLock()
defer dn.RUnlock()
for _, c := range dn.children {
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 207c89ad7..aacdfa7d2 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -1,6 +1,7 @@
package topology
import (
+ "encoding/json"
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -10,6 +11,7 @@ import (
"time"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -40,7 +42,8 @@ type Topology struct {
Configuration *Configuration
- RaftServer raft.Server
+ RaftServer raft.Server
+ HashicorpRaft *hashicorpRaft.Raft
}
func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
@@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool {
return true
}
}
+ } else if t.HashicorpRaft != nil {
+ if t.HashicorpRaft.State() == hashicorpRaft.Leader {
+ return true
+ }
}
return false
}
@@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) {
for count := 0; count < 3; count++ {
if t.RaftServer != nil {
l = pb.ServerAddress(t.RaftServer.Leader())
+ } else if t.HashicorpRaft != nil {
+ l = pb.ServerAddress(t.HashicorpRaft.Leader())
} else {
return "", errors.New("Raft Server not ready yet!")
}
@@ -124,8 +133,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*
func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
vid := t.GetMaxVolumeId()
next := vid.Next()
- if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
- return 0, err
+ if t.RaftServer != nil {
+ if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
+ return 0, err
+ }
+ } else if t.HashicorpRaft != nil {
+ b, err := json.Marshal(NewMaxVolumeIdCommand(next))
+ if err != nil {
+ return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
+ }
+ if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil {
+ return 0, future.Error()
+ }
}
return next, nil
}
@@ -283,3 +302,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume
return
}
+
+func (t *Topology) DataNodeRegistration(dcName, rackName string ,dn *DataNode){
+ if dn.Parent() != nil{
+ return
+ }
+ // registration to topo
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ rack.LinkChildNode(dn)
+ glog.Infof("[%s] reLink To topo ", dn.Id())
+} \ No newline at end of file
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 0f1db74df..fe3717233 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -1,6 +1,7 @@
package topology
import (
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
"math/rand"
@@ -24,7 +25,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
- t.Vacuum(grpcDialOption, garbageThreshold, preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate)
}
}
}(garbageThreshold)
@@ -84,7 +85,8 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
negativeUsages := dn.GetDiskUsages().negative()
dn.UpAdjustDiskUsageDelta(negativeUsages)
-
+ dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes())
+ dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards())
if dn.Parent() != nil {
dn.Parent().UnlinkChildNode(dn.Id())
}
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 74d70bcdb..147220f4a 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -60,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
}
return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
}
+
func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
locationlist *VolumeLocationList, preallocate int64) bool {
vl.accessLock.Lock()
@@ -116,6 +117,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
}
return isVacuumSuccess
}
+
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
@@ -144,6 +146,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
}
return isCommitSuccess
}
+
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
@@ -161,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *
}
}
-func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) {
// if there is vacuum going on, return immediately
swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
@@ -172,12 +175,19 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float
// now only one vacuum process going on
- glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
+ glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
+ garbageThreshold, collection, volumeId)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
+ if collection != "" && collection != c.Name {
+ continue
+ }
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
+ if volumeId > 0 && volumeLayout.Lookup(needle.VolumeId(volumeId)) == nil {
+ continue
+ }
t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
}
}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index de840f18f..167aee8ea 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -440,8 +440,11 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- glog.V(0).Infof("Volume %d reaches full capacity.", vid)
- return vl.removeFromWritable(vid)
+ wasWritable := vl.removeFromWritable(vid)
+ if wasWritable {
+ glog.V(0).Infof("Volume %d reaches full capacity.", vid)
+ }
+ return wasWritable
}
func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {