aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/node.go')
-rw-r--r--weed/topology/node.go22
1 files changed, 18 insertions, 4 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go
index a42146339..1380e79f8 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -6,6 +6,7 @@ import (
"strings"
"sync"
"sync/atomic"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -249,15 +250,28 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
dn := c.(*DataNode) //can not cast n to DataNode
dn.RLock()
for _, v := range dn.GetVolumes() {
+ topo := n.GetTopology()
+ diskType := types.ToDiskType(v.DiskType)
+ vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
+
if v.Size >= volumeSizeLimit {
- //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
- n.GetTopology().chanFullVolumes <- v
+ vl.accessLock.RLock()
+ vacuumTime, ok := vl.vacuumedVolumes[v.Id]
+ vl.accessLock.RUnlock()
+
+ // If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
+ // After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
+ // the volume size should be correct, not the size before the vacuum.
+ if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
+ //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
+ topo.chanFullVolumes <- v
+ }
} else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
- n.GetTopology().chanCrowdedVolumes <- v
+ topo.chanCrowdedVolumes <- v
}
copyCount := v.ReplicaPlacement.GetCopyCount()
if copyCount > 1 {
- if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
+ if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
} else {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)