aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-02-13 00:27:23 -0800
committerGitHub <noreply@github.com>2020-02-13 00:27:23 -0800
commit2cddc23ae821344c486b4561bb1fbe9beafc95e8 (patch)
tree9aa6302fafaf315968ff16bd86650f17a454f38a
parent62c34454d84dd012ffbc150932cc3476efcf88ea (diff)
parent7b3764fd9e2308bddb3373420958ddc3d470f08c (diff)
downloadseaweedfs-2cddc23ae821344c486b4561bb1fbe9beafc95e8.tar.xz
seaweedfs-2cddc23ae821344c486b4561bb1fbe9beafc95e8.zip
Merge pull request #1198 from stlpmo-jn/resolve_core_when_loadVolume_failed
when Store.MountVolume(), the volume server will be core
-rw-r--r--weed/storage/disk_location.go51
1 files changed, 30 insertions, 21 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index a12a68cbc..f15303282 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -50,29 +50,39 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
return collection, vol, err
}
-func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) {
+func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
name := fileInfo.Name()
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
vid, collection, err := l.volumeIdFromPath(fileInfo)
- if err == nil {
- l.volumesLock.RLock()
- _, found := l.volumes[vid]
- l.volumesLock.RUnlock()
- if !found {
- if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil {
- l.volumesLock.Lock()
- l.volumes[vid] = v
- l.volumesLock.Unlock()
- size, _, _ := v.FileStat()
- glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
- // println("volume", vid, "last append at", v.lastAppendAtNs)
- } else {
- glog.V(0).Infof("new volume %s error %s", name, e)
- }
- }
+ if err != nil {
+ glog.Warningf("get volume id failed, %s, err : %s", name, err)
+ return false
+ }
+
+ // void loading one volume more than once
+ l.volumesLock.RLock()
+ _, found := l.volumes[vid]
+ l.volumesLock.RUnlock()
+ if found {
+ glog.V(1).Infof("loaded volume, %v", vid)
+ return true
+ }
+
+ v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0)
+ if e != nil {
+ glog.V(0).Infof("new volume %s error %s", name, e)
+ return false
}
+
+ l.volumesLock.Lock()
+ l.volumes[vid] = v
+ l.volumesLock.Unlock()
+ size, _, _ := v.FileStat()
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
+ l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ return true
}
+ return false
}
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
@@ -93,7 +103,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
go func() {
defer wg.Done()
for dir := range task_queue {
- l.loadExistingVolume(dir, needleMapKind)
+ _ = l.loadExistingVolume(dir, needleMapKind)
}
}()
}
@@ -172,8 +182,7 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
if fileInfo, found := l.LocateVolume(vid); found {
- l.loadExistingVolume(fileInfo, needleMapKind)
- return true
+ return l.loadExistingVolume(fileInfo, needleMapKind)
}
return false
}