aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/disk_location.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/disk_location.go')
-rw-r--r--weed/storage/disk_location.go25
1 files changed, 15 insertions, 10 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index a3f0b6585..02f5f5923 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -134,7 +134,7 @@ func getValidVolumeName(basename string) string {
return ""
}
-func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64) bool {
+func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind NeedleMapKind, skipIfEcVolumesExists bool, ldbTimeout int64, diskId uint32) bool {
basename := dirEntry.Name()
if dirEntry.IsDir() {
return false
@@ -184,15 +184,16 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false
}
+ v.diskId = diskId // Set the disk ID for existing volumes
l.SetVolume(vid, v)
size, _, _ := v.FileStat()
- glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s disk_id=%d",
+ l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String(), diskId)
return true
}
-func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64) {
+func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int, ldbTimeout int64, diskId uint32) {
task_queue := make(chan os.DirEntry, 10*concurrency)
go func() {
@@ -218,7 +219,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
go func() {
defer wg.Done()
for fi := range task_queue {
- _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout)
+ _ = l.loadExistingVolume(fi, needleMapKind, true, ldbTimeout, diskId)
}
}()
}
@@ -227,6 +228,10 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
}
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeout int64) {
+ l.loadExistingVolumesWithId(needleMapKind, ldbTimeout, 0) // Default disk ID for backward compatibility
+}
+
+func (l *DiskLocation) loadExistingVolumesWithId(needleMapKind NeedleMapKind, ldbTimeout int64, diskId uint32) {
workerNum := runtime.NumCPU()
val, ok := os.LookupEnv("GOMAXPROCS")
@@ -242,11 +247,11 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind, ldbTimeo
workerNum = 10
}
}
- l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout)
- glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
+ l.concurrentLoadingVolumes(needleMapKind, workerNum, ldbTimeout, diskId)
+ glog.V(0).Infof("Store started on dir: %s with %d volumes max %d (disk ID: %d)", l.Directory, len(l.volumes), l.MaxVolumeCount, diskId)
l.loadAllEcShards()
- glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
+ glog.V(0).Infof("Store started on dir: %s with %d ec shards (disk ID: %d)", l.Directory, len(l.ecVolumes), diskId)
}
@@ -310,9 +315,9 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId, onlyEmpty bool) (fo
return
}
-func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
+func (l *DiskLocation) LoadVolume(diskId uint32, vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
if fileInfo, found := l.LocateVolume(vid); found {
- return l.loadExistingVolume(fileInfo, needleMapKind, false, 0)
+ return l.loadExistingVolume(fileInfo, needleMapKind, false, 0, diskId)
}
return false
}