aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/topology/disk.go13
-rw-r--r--weed/topology/topology_test.go114
2 files changed, 127 insertions, 0 deletions
diff --git a/weed/topology/disk.go b/weed/topology/disk.go
index 8ca25c244..f27589916 100644
--- a/weed/topology/disk.go
+++ b/weed/topology/disk.go
@@ -176,6 +176,19 @@ func (d *Disk) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool)
d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), deltaDiskUsage)
}
isChanged = d.volumes[v.Id].ReadOnly != v.ReadOnly
+ if isChanged {
+ // Adjust active volume count when ReadOnly status changes
+ // Use a separate delta object to avoid affecting other metric adjustments
+ readOnlyDelta := &DiskUsageCounts{}
+ if v.ReadOnly {
+ // Changed from writable to read-only
+ readOnlyDelta.activeVolumeCount = -1
+ } else {
+ // Changed from read-only to writable
+ readOnlyDelta.activeVolumeCount = 1
+ }
+ d.UpAdjustDiskUsageDelta(types.ToDiskType(v.DiskType), readOnlyDelta)
+ }
d.volumes[v.Id] = v
}
return
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 667e941df..8515d2f81 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -211,6 +211,120 @@ func TestAddRemoveVolume(t *testing.T) {
}
}
+func TestVolumeReadOnlyStatusChange(t *testing.T) {
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ maxVolumeCounts := make(map[string]uint32)
+ maxVolumeCounts[""] = 25
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+
+ // Create a writable volume
+ v := storage.VolumeInfo{
+ Id: needle.VolumeId(1),
+ Size: 100,
+ Collection: "",
+ DiskType: "",
+ FileCount: 10,
+ DeleteCount: 0,
+ DeletedByteCount: 0,
+ ReadOnly: false, // Initially writable
+ Version: needle.GetCurrentVersion(),
+ ReplicaPlacement: &super_block.ReplicaPlacement{},
+ Ttl: needle.EMPTY_TTL,
+ }
+
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+ topo.RegisterVolumeLayout(v, dn)
+
+ // Check initial active count (should be 1 since volume is writable)
+ usageCounts := topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "initial activeVolumeCount", int(usageCounts.activeVolumeCount), 1)
+ assert(t, "initial remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
+
+ // Change volume to read-only
+ v.ReadOnly = true
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+
+ // Check active count after marking read-only (should be 0)
+ usageCounts = topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "activeVolumeCount after read-only", int(usageCounts.activeVolumeCount), 0)
+
+ // Change volume back to writable
+ v.ReadOnly = false
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+
+ // Check active count after marking writable again (should be 1)
+ usageCounts = topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "activeVolumeCount after writable again", int(usageCounts.activeVolumeCount), 1)
+}
+
+func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) {
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ maxVolumeCounts := make(map[string]uint32)
+ maxVolumeCounts[""] = 25
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+
+ // Create a writable, local volume
+ v := storage.VolumeInfo{
+ Id: needle.VolumeId(1),
+ Size: 100,
+ Collection: "",
+ DiskType: "",
+ FileCount: 10,
+ DeleteCount: 0,
+ DeletedByteCount: 0,
+ ReadOnly: false, // Initially writable
+ RemoteStorageName: "", // Initially local
+ Version: needle.GetCurrentVersion(),
+ ReplicaPlacement: &super_block.ReplicaPlacement{},
+ Ttl: needle.EMPTY_TTL,
+ }
+
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+ topo.RegisterVolumeLayout(v, dn)
+
+ // Check initial counts
+ usageCounts := topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "initial activeVolumeCount", int(usageCounts.activeVolumeCount), 1)
+ assert(t, "initial remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
+
+ // Simultaneously change to read-only AND remote
+ v.ReadOnly = true
+ v.RemoteStorageName = "s3"
+ v.RemoteStorageKey = "key1"
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+
+ // Check counts after both changes
+ usageCounts = topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "activeVolumeCount after read-only+remote", int(usageCounts.activeVolumeCount), 0)
+ assert(t, "remoteVolumeCount after read-only+remote", int(usageCounts.remoteVolumeCount), 1)
+
+ // Change back to writable but keep remote
+ v.ReadOnly = false
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+
+ // Check counts - should be writable (active=1) and still remote
+ usageCounts = topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "activeVolumeCount after writable+remote", int(usageCounts.activeVolumeCount), 1)
+ assert(t, "remoteVolumeCount after writable+remote", int(usageCounts.remoteVolumeCount), 1)
+
+ // Change back to local AND read-only simultaneously
+ v.ReadOnly = true
+ v.RemoteStorageName = ""
+ v.RemoteStorageKey = ""
+ dn.UpdateVolumes([]storage.VolumeInfo{v})
+
+ // Check final counts
+ usageCounts = topo.diskUsages.usages[types.HardDriveType]
+ assert(t, "final activeVolumeCount", int(usageCounts.activeVolumeCount), 0)
+ assert(t, "final remoteVolumeCount", int(usageCounts.remoteVolumeCount), 0)
+}
+
func TestListCollections(t *testing.T) {
rp, _ := super_block.NewReplicaPlacementFromString("002")