diff options
| author | guosj <515878133@qq.com> | 2022-04-19 09:26:06 +0800 |
|---|---|---|
| committer | guosj <515878133@qq.com> | 2022-04-19 09:26:06 +0800 |
| commit | 94c702402e879843792acc4be2cf01198268f250 (patch) | |
| tree | 593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/topology | |
| parent | 5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff) | |
| parent | 82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (diff) | |
| download | seaweedfs-94c702402e879843792acc4be2cf01198268f250.tar.xz seaweedfs-94c702402e879843792acc4be2cf01198268f250.zip | |
Merge branch 'chrislusf-master'
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/cluster_commands.go | 20 | ||||
| -rw-r--r-- | weed/topology/data_node_ec.go | 4 | ||||
| -rw-r--r-- | weed/topology/topology.go | 36 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 6 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 14 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 7 |
6 files changed, 76 insertions, 11 deletions
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 152691ccb..1bcc6b449 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -1,9 +1,12 @@ package topology import ( + "encoding/json" + "fmt" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" + hashicorpRaft "github.com/hashicorp/raft" ) type MaxVolumeIdCommand struct { @@ -20,6 +23,7 @@ func (c *MaxVolumeIdCommand) CommandName() string { return "MaxVolumeId" } +// deprecatedCommandApply represents the old interface to apply a command to the server. func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { topo := server.Context().(*Topology) before := topo.GetMaxVolumeId() @@ -29,3 +33,19 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { return nil, nil } + +func (s *MaxVolumeIdCommand) Persist(sink hashicorpRaft.SnapshotSink) error { + b, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("marshal: %v", err) + } + _, err = sink.Write(b) + if err != nil { + sink.Cancel() + return fmt.Errorf("sink.Write(): %v", err) + } + return sink.Close() +} + +func (s *MaxVolumeIdCommand) Release() { +} diff --git a/weed/topology/data_node_ec.go b/weed/topology/data_node_ec.go index 330b16b24..bf72fa9af 100644 --- a/weed/topology/data_node_ec.go +++ b/weed/topology/data_node_ec.go @@ -58,7 +58,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) } for _, ecShards := range actualShards { - if dn.hasEcShards(ecShards.VolumeId) { + if dn.HasEcShards(ecShards.VolumeId) { continue } @@ -79,7 +79,7 @@ func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) return } -func (dn *DataNode) hasEcShards(volumeId needle.VolumeId) (found bool) { +func (dn *DataNode) HasEcShards(volumeId needle.VolumeId) (found bool) { dn.RLock() defer dn.RUnlock() for _, c := range dn.children { diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 207c89ad7..aacdfa7d2 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -1,6 +1,7 @@ package topology import ( + "encoding/json" "errors" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" @@ -10,6 +11,7 @@ import ( "time" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -40,7 +42,8 @@ type Topology struct { Configuration *Configuration - RaftServer raft.Server + RaftServer raft.Server + HashicorpRaft *hashicorpRaft.Raft } func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology { @@ -76,6 +79,10 @@ func (t *Topology) IsLeader() bool { return true } } + } else if t.HashicorpRaft != nil { + if t.HashicorpRaft.State() == hashicorpRaft.Leader { + return true + } } return false } @@ -85,6 +92,8 @@ func (t *Topology) Leader() (pb.ServerAddress, error) { for count := 0; count < 3; count++ { if t.RaftServer != nil { l = pb.ServerAddress(t.RaftServer.Leader()) + } else if t.HashicorpRaft != nil { + l = pb.ServerAddress(t.HashicorpRaft.Leader()) } else { return "", errors.New("Raft Server not ready yet!") } @@ -124,8 +133,18 @@ func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []* func (t *Topology) NextVolumeId() (needle.VolumeId, error) { vid := t.GetMaxVolumeId() next := vid.Next() - if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { - return 0, err + if t.RaftServer != nil { + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + return 0, err + } + } else if t.HashicorpRaft != nil { + b, err := json.Marshal(NewMaxVolumeIdCommand(next)) + if err != nil { + return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err) + } + if future := t.HashicorpRaft.Apply(b, time.Second); future.Error() != nil { + return 0, future.Error() + } } return next, nil } @@ -283,3 +302,14 @@ func (t *Topology) IncrementalSyncDataNodeRegistration(newVolumes, deletedVolume return } + +func (t *Topology) DataNodeRegistration(dcName, rackName string ,dn *DataNode){ + if dn.Parent() != nil{ + return + } + // registration to topo + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + rack.LinkChildNode(dn) + glog.Infof("[%s] reLink To topo ", dn.Id()) +}
\ No newline at end of file diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 0f1db74df..fe3717233 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -1,6 +1,7 @@ package topology import ( + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/types" "google.golang.org/grpc" "math/rand" @@ -24,7 +25,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g c := time.Tick(15 * time.Minute) for _ = range c { if t.IsLeader() { - t.Vacuum(grpcDialOption, garbageThreshold, preallocate) + t.Vacuum(grpcDialOption, garbageThreshold, 0, "", preallocate) } } }(garbageThreshold) @@ -84,7 +85,8 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { negativeUsages := dn.GetDiskUsages().negative() dn.UpAdjustDiskUsageDelta(negativeUsages) - + dn.DeltaUpdateVolumes([]storage.VolumeInfo{}, dn.GetVolumes()) + dn.DeltaUpdateEcShards([]*erasure_coding.EcVolumeInfo{}, dn.GetEcShards()) if dn.Parent() != nil { dn.Parent().UnlinkChildNode(dn.Id()) } diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 74d70bcdb..147220f4a 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -60,6 +60,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne } return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0 } + func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool { vl.accessLock.Lock() @@ -116,6 +117,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * } return isVacuumSuccess } + func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool { isCommitSuccess := true isReadOnly := false @@ -144,6 +146,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V } return isCommitSuccess } + func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) @@ -161,7 +164,7 @@ func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl * } } -func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) { +func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, volumeId uint32, collection string, preallocate int64) { // if there is vacuum going on, return immediately swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1) @@ -172,12 +175,19 @@ func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float // now only one vacuum process going on - glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold) + glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d", + garbageThreshold, collection, volumeId) for _, col := range t.collectionMap.Items() { c := col.(*Collection) + if collection != "" && collection != c.Name { + continue + } for _, vl := range c.storageType2VolumeLayout.Items() { if vl != nil { volumeLayout := vl.(*VolumeLayout) + if volumeId > 0 && volumeLayout.Lookup(needle.VolumeId(volumeId)) == nil { + continue + } t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate) } } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index de840f18f..167aee8ea 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -440,8 +440,11 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() - glog.V(0).Infof("Volume %d reaches full capacity.", vid) - return vl.removeFromWritable(vid) + wasWritable := vl.removeFromWritable(vid) + if wasWritable { + glog.V(0).Infof("Volume %d reaches full capacity.", vid) + } + return wasWritable } func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) { |
