aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/data_node.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/data_node.go')
-rw-r--r--weed/topology/data_node.go16
1 files changed, 9 insertions, 7 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 760f14ded..ee0d37956 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "sync/atomic"
)
type DataNode struct {
@@ -53,14 +54,14 @@ func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
return disk
}
-func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
+func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
disk := dn.getOrCreateDisk(v.DiskType)
return disk.AddOrUpdateVolume(v)
}
// UpdateVolumes detects new/deleted/changed volumes on a volume server
// used in master to notify master clients of these changes.
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
@@ -93,12 +94,12 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
}
}
for _, v := range actualVolumes {
- isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
+ isNew, isChanged := dn.doAddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
- if isChangedRO {
- changeRO = append(changeRO, v)
+ if isChanged {
+ changedVolumes = append(changedVolumes, v)
}
}
return
@@ -141,12 +142,13 @@ func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
}
dt := types.ToDiskType(diskType)
currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
- if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) {
+ currentDiskUsageMaxVolumeCount := atomic.LoadInt64(&currentDiskUsage.maxVolumeCount)
+ if currentDiskUsageMaxVolumeCount == int64(maxVolumeCount) {
continue
}
disk := dn.getOrCreateDisk(dt.String())
deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
- deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
+ deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount
disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}