diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/node.go | 22 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 26 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 9 |
3 files changed, 45 insertions, 12 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go index a42146339..1380e79f8 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -6,6 +6,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -249,15 +250,28 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi dn := c.(*DataNode) //can not cast n to DataNode dn.RLock() for _, v := range dn.GetVolumes() { + topo := n.GetTopology() + diskType := types.ToDiskType(v.DiskType) + vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) + if v.Size >= volumeSizeLimit { - //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- v + vl.accessLock.RLock() + vacuumTime, ok := vl.vacuumedVolumes[v.Id] + vl.accessLock.RUnlock() + + // If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity. + // After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master, + // the volume size should be correct, not the size before the vacuum. + if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) { + //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) + topo.chanFullVolumes <- v + } } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold { - n.GetTopology().chanCrowdedVolumes <- v + topo.chanCrowdedVolumes <- v } copyCount := v.ReplicaPlacement.GetCopyCount() if copyCount > 1 { - if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) { + if copyCount > len(topo.Lookup(v.Collection, v.Id)) { stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1) } else { stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 1581d52fb..c8ebdb249 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -123,14 +123,20 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl * func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool { isCommitSuccess := true isReadOnly := false + isFullCapacity := false for _, dn := range vacuumLocationList.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) - if resp != nil && resp.IsReadOnly { - isReadOnly = true + if resp != nil { + if resp.IsReadOnly { + isReadOnly = true + } + if resp.VolumeSize > t.volumeSizeLimit { + isFullCapacity = true + } } return err }) @@ -157,8 +163,13 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ VolumeId: uint32(vid), }) - if resp != nil && resp.IsReadOnly { - isReadOnly = true + if resp != nil { + if resp.IsReadOnly { + isReadOnly = true + } + if resp.VolumeSize > t.volumeSizeLimit { + isFullCapacity = true + } } return err }) @@ -187,8 +198,13 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V dn.Unlock() } + //record vacuum time of volume + vl.accessLock.Lock() + vl.vacuumedVolumes[vid] = time.Now() + vl.accessLock.Unlock() + for _, dn := range vacuumLocationList.list { - vl.SetVolumeAvailable(dn, vid, isReadOnly) + vl.SetVolumeAvailable(dn, vid, isReadOnly, isFullCapacity) } } return isCommitSuccess diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index d1bc28ef8..f8ea28255 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -3,12 +3,13 @@ package topology import ( "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/storage/types" "math/rand" "sync" "sync/atomic" "time" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" @@ -114,6 +115,7 @@ type VolumeLayout struct { crowded map[needle.VolumeId]struct{} readonlyVolumes *volumesBinaryState // readonly volumes oversizedVolumes *volumesBinaryState // oversized volumes + vacuumedVolumes map[needle.VolumeId]time.Time volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex @@ -135,6 +137,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType crowded: make(map[needle.VolumeId]struct{}), readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), + vacuumedVolumes: make(map[needle.VolumeId]time.Time), volumeSizeLimit: volumeSizeLimit, replicationAsMin: replicationAsMin, } @@ -436,7 +439,7 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) } return false } -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool { +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly, isFullCapacity bool) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() @@ -447,7 +450,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is vl.vid2location[vid].Set(dn) - if vInfo.ReadOnly || isReadOnly { + if vInfo.ReadOnly || isReadOnly || isFullCapacity { return false } |
