aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/node.go22
-rw-r--r--weed/topology/topology_vacuum.go26
-rw-r--r--weed/topology/volume_layout.go9
3 files changed, 45 insertions, 12 deletions
diff --git a/weed/topology/node.go b/weed/topology/node.go
index a42146339..1380e79f8 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -6,6 +6,7 @@ import (
"strings"
"sync"
"sync/atomic"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -249,15 +250,28 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
dn := c.(*DataNode) //can not cast n to DataNode
dn.RLock()
for _, v := range dn.GetVolumes() {
+ topo := n.GetTopology()
+ diskType := types.ToDiskType(v.DiskType)
+ vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
+
if v.Size >= volumeSizeLimit {
- //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
- n.GetTopology().chanFullVolumes <- v
+ vl.accessLock.RLock()
+ vacuumTime, ok := vl.vacuumedVolumes[v.Id]
+ vl.accessLock.RUnlock()
+
+ // If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
+ // After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
+ // the volume size should be correct, not the size before the vacuum.
+ if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
+ //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
+ topo.chanFullVolumes <- v
+ }
} else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
- n.GetTopology().chanCrowdedVolumes <- v
+ topo.chanCrowdedVolumes <- v
}
copyCount := v.ReplicaPlacement.GetCopyCount()
if copyCount > 1 {
- if copyCount > len(n.GetTopology().Lookup(v.Collection, v.Id)) {
+ if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
} else {
stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 1581d52fb..c8ebdb249 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -123,14 +123,20 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
+ isFullCapacity := false
for _, dn := range vacuumLocationList.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
- if resp != nil && resp.IsReadOnly {
- isReadOnly = true
+ if resp != nil {
+ if resp.IsReadOnly {
+ isReadOnly = true
+ }
+ if resp.VolumeSize > t.volumeSizeLimit {
+ isFullCapacity = true
+ }
}
return err
})
@@ -157,8 +163,13 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(vid),
})
- if resp != nil && resp.IsReadOnly {
- isReadOnly = true
+ if resp != nil {
+ if resp.IsReadOnly {
+ isReadOnly = true
+ }
+ if resp.VolumeSize > t.volumeSizeLimit {
+ isFullCapacity = true
+ }
}
return err
})
@@ -187,8 +198,13 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
dn.Unlock()
}
+ //record vacuum time of volume
+ vl.accessLock.Lock()
+ vl.vacuumedVolumes[vid] = time.Now()
+ vl.accessLock.Unlock()
+
for _, dn := range vacuumLocationList.list {
- vl.SetVolumeAvailable(dn, vid, isReadOnly)
+ vl.SetVolumeAvailable(dn, vid, isReadOnly, isFullCapacity)
}
}
return isCommitSuccess
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index d1bc28ef8..f8ea28255 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -3,12 +3,13 @@ package topology
import (
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
"math/rand"
"sync"
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@@ -114,6 +115,7 @@ type VolumeLayout struct {
crowded map[needle.VolumeId]struct{}
readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes *volumesBinaryState // oversized volumes
+ vacuumedVolumes map[needle.VolumeId]time.Time
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
@@ -135,6 +137,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
crowded: make(map[needle.VolumeId]struct{}),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
+ vacuumedVolumes: make(map[needle.VolumeId]time.Time),
volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin,
}
@@ -436,7 +439,7 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId)
}
return false
}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly bool) bool {
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, isReadOnly, isFullCapacity bool) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -447,7 +450,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is
vl.vid2location[vid].Set(dn)
- if vInfo.ReadOnly || isReadOnly {
+ if vInfo.ReadOnly || isReadOnly || isFullCapacity {
return false
}