diff options
Diffstat (limited to 'weed/storage/disk_location.go')
| -rw-r--r-- | weed/storage/disk_location.go | 224 |
1 files changed, 169 insertions, 55 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index e116fc715..ed4e00312 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -1,45 +1,68 @@ package storage import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "os" + "path/filepath" "strings" "sync" - - "fmt" + "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) type DiskLocation struct { - Directory string - MaxVolumeCount int - volumes map[needle.VolumeId]*Volume - volumesLock sync.RWMutex + Directory string + IdxDirectory string + DiskType types.DiskType + MaxVolumeCount int + OriginalMaxVolumeCount int + MinFreeSpacePercent float32 + volumes map[needle.VolumeId]*Volume + volumesLock sync.RWMutex // erasure coding ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume ecVolumesLock sync.RWMutex + + isDiskSpaceLow bool } -func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} +func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation { + dir = util.ResolvePath(dir) + if idxDir == "" { + idxDir = dir + } else { + idxDir = util.ResolvePath(idxDir) + } + location := &DiskLocation{ + Directory: dir, + IdxDirectory: idxDir, + DiskType: diskType, + MaxVolumeCount: maxVolumeCount, + OriginalMaxVolumeCount: maxVolumeCount, + MinFreeSpacePercent: minFreeSpacePercent, + } location.volumes = make(map[needle.VolumeId]*Volume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + go location.CheckDiskSpace() return location } -func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".idx") { - base := name[:len(name)-len(".idx")] +func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) { + if isValidVolume(filename) { + base := filename[:len(filename)-4] collection, volumeId, err := parseCollectionVolumeId(base) return volumeId, collection, err } - return 0, "", fmt.Errorf("Path is not a volume: %s", name) + return 0, "", fmt.Errorf("file is not a volume: %s", filename) } func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) { @@ -51,38 +74,83 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI return collection, vol, err } -func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) { - name := fileInfo.Name() - if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { - vid, collection, err := l.volumeIdFromPath(fileInfo) - if err == nil { - l.volumesLock.RLock() - _, found := l.volumes[vid] - l.volumesLock.RUnlock() - if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { - l.volumesLock.Lock() - l.volumes[vid] = v - l.volumesLock.Unlock() - size, _, _ := v.FileStat() - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) - // println("volume", vid, "last append at", v.lastAppendAtNs) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } - } - } +func isValidVolume(basename string) bool { + return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif") +} + +func getValidVolumeName(basename string) string { + if isValidVolume(basename) { + return basename[:len(basename)-4] } + return "" } -func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { +func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool { + basename := fileInfo.Name() + if fileInfo.IsDir() { + return false + } + volumeName := getValidVolumeName(basename) + if volumeName == "" { + return false + } + + // check for incomplete volume + noteFile := l.Directory + "/" + volumeName + ".note" + if util.FileExists(noteFile) { + note, _ := ioutil.ReadFile(noteFile) + glog.Warningf("volume %s was not completed: %s", volumeName, string(note)) + removeVolumeFiles(l.Directory + "/" + volumeName) + removeVolumeFiles(l.IdxDirectory + "/" + volumeName) + return false + } + + // parse out collection, volume id + vid, collection, err := volumeIdFromFileName(basename) + if err != nil { + glog.Warningf("get volume id failed, %s, err : %s", volumeName, err) + return false + } + + // avoid loading one volume more than once + l.volumesLock.RLock() + _, found := l.volumes[vid] + l.volumesLock.RUnlock() + if found { + glog.V(1).Infof("loaded volume, %v", vid) + return true + } + + // load the volume + v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0) + if e != nil { + glog.V(0).Infof("new volume %s error %s", volumeName, e) + return false + } + + l.SetVolume(vid, v) + + size, _, _ := v.FileStat() + glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s", + l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + return true +} + +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) { task_queue := make(chan os.FileInfo, 10*concurrency) go func() { - if dirs, err := ioutil.ReadDir(l.Directory); err == nil { - for _, dir := range dirs { - task_queue <- dir + foundVolumeNames := make(map[string]bool) + if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { + for _, fi := range fileInfos { + volumeName := getValidVolumeName(fi.Name()) + if volumeName == "" { + continue + } + if _, found := foundVolumeNames[volumeName]; !found { + foundVolumeNames[volumeName] = true + task_queue <- fi + } } } close(task_queue) @@ -93,8 +161,8 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con wg.Add(1) go func() { defer wg.Done() - for dir := range task_queue { - l.loadExistingVolume(dir, needleMapKind) + for fi := range task_queue { + _ = l.loadExistingVolume(fi, needleMapKind) } }() } @@ -102,7 +170,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { l.concurrentLoadingVolumes(needleMapKind, 10) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) @@ -158,7 +226,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er return } -func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { +func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) { v, ok := l.volumes[vid] if !ok { return @@ -167,21 +235,15 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { if e != nil { return } + found = true delete(l.volumes, vid) return } -func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { - if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { - for _, fileInfo := range fileInfos { - volId, _, err := l.volumeIdFromPath(fileInfo) - if vid == volId && err == nil { - l.loadExistingVolume(fileInfo, needleMapKind) - return true - } - } +func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { + if fileInfo, found := l.LocateVolume(vid); found { + return l.loadExistingVolume(fileInfo, needleMapKind) } - return false } @@ -193,7 +255,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { if !ok { return fmt.Errorf("Volume not found, VolumeId: %d", vid) } - return l.deleteVolumeById(vid) + _, err := l.deleteVolumeById(vid) + return err } func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { @@ -217,7 +280,7 @@ func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[need } } - for k, _ := range deltaVols { + for k := range deltaVols { delete(l.volumes, k) } return deltaVols @@ -228,6 +291,7 @@ func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { defer l.volumesLock.Unlock() l.volumes[vid] = volume + volume.location = l } func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) { @@ -260,3 +324,53 @@ func (l *DiskLocation) Close() { return } + +func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) { + if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { + for _, fileInfo := range fileInfos { + volId, _, err := volumeIdFromFileName(fileInfo.Name()) + if vid == volId && err == nil { + return fileInfo, true + } + } + } + + return nil, false +} + +func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) { + + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() + + for _, vol := range l.volumes { + if vol.IsReadOnly() { + continue + } + datSize, idxSize, _ := vol.FileStat() + unUsedSpace += volumeSizeLimit - (datSize + idxSize) + } + + return +} + +func (l *DiskLocation) CheckDiskSpace() { + for { + if dir, e := filepath.Abs(l.Directory); e == nil { + s := stats.NewDiskStatus(dir) + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All)) + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used)) + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free)) + if (s.PercentFree < l.MinFreeSpacePercent) != l.isDiskSpaceLow { + l.isDiskSpaceLow = !l.isDiskSpaceLow + } + if l.isDiskSpaceLow { + glog.V(0).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow) + } else { + glog.V(4).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow) + } + } + time.Sleep(time.Minute) + } + +} |
