diff options
Diffstat (limited to 'weed/storage/disk_location.go')
| -rw-r--r-- | weed/storage/disk_location.go | 239 |
1 files changed, 155 insertions, 84 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9589d9281..f15303282 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -1,76 +1,91 @@ package storage import ( + "fmt" "io/ioutil" "os" "strings" "sync" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) type DiskLocation struct { Directory string MaxVolumeCount int - volumes map[VolumeId]*Volume - sync.RWMutex + volumes map[needle.VolumeId]*Volume + volumesLock sync.RWMutex + + // erasure coding + ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume + ecVolumesLock sync.RWMutex } func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} - location.volumes = make(map[VolumeId]*Volume) + location.volumes = make(map[needle.VolumeId]*Volume) + location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) return location } -func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) { +func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) { name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - collection := "" - base := name[:len(name)-len(".dat")] - i := strings.LastIndex(base, "_") - if i > 0 { - collection, base = base[0:i], base[i+1:] - } - vol, err := NewVolumeId(base) - return vol, collection, err + if !dir.IsDir() && strings.HasSuffix(name, ".idx") { + base := name[:len(name)-len(".idx")] + collection, volumeId, err := parseCollectionVolumeId(base) + return volumeId, collection, err } return 0, "", fmt.Errorf("Path is not a volume: %s", name) } -func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - vid, collection, err := l.volumeIdFromPath(dir) - if err == nil { - mutex.RLock() - _, found := l.volumes[vid] - mutex.RUnlock() - if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil { - mutex.Lock() - l.volumes[vid] = v - mutex.Unlock() - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } - } - } +func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) { + i := strings.LastIndex(base, "_") + if i > 0 { + collection, base = base[0:i], base[i+1:] } + vol, err := needle.NewVolumeId(base) + return collection, vol, err } -func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) { - var concurrency int - if concurrentFlag { - //You could choose a better optimized concurency value after testing at your environment - concurrency = 10 - } else { - concurrency = 1 +func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { + name := fileInfo.Name() + if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { + vid, collection, err := l.volumeIdFromPath(fileInfo) + if err != nil { + glog.Warningf("get volume id failed, %s, err : %s", name, err) + return false + } + + // void loading one volume more than once + l.volumesLock.RLock() + _, found := l.volumes[vid] + l.volumesLock.RUnlock() + if found { + glog.V(1).Infof("loaded volume, %v", vid) + return true + } + + v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0) + if e != nil { + glog.V(0).Infof("new volume %s error %s", name, e) + return false + } + + l.volumesLock.Lock() + l.volumes[vid] = v + l.volumesLock.Unlock() + size, _, _ := v.FileStat() + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", + l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + return true } + return false +} + +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { task_queue := make(chan os.FileInfo, 10*concurrency) go func() { @@ -83,13 +98,12 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con }() var wg sync.WaitGroup - var mutex sync.RWMutex for workerNum := 0; workerNum < concurrency; workerNum++ { wg.Add(1) go func() { defer wg.Done() for dir := range task_queue { - l.loadExistingVolume(dir, needleMapKind, &mutex) + _ = l.loadExistingVolume(dir, needleMapKind) } }() } @@ -98,30 +112,62 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con } func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { - l.Lock() - defer l.Unlock() - l.concurrentLoadingVolumes(needleMapKind, true) + l.concurrentLoadingVolumes(needleMapKind, 10) + glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) + + l.loadAllEcShards() + glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes)) - glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount) } func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { - l.Lock() - defer l.Unlock() - for k, v := range l.volumes { - if v.Collection == collection { - e = l.deleteVolumeById(k) - if e != nil { - return + l.volumesLock.Lock() + delVolsMap := l.unmountVolumeByCollection(collection) + l.volumesLock.Unlock() + + l.ecVolumesLock.Lock() + delEcVolsMap := l.unmountEcVolumeByCollection(collection) + l.ecVolumesLock.Unlock() + + errChain := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(2) + go func() { + for _, v := range delVolsMap { + if err := v.Destroy(); err != nil { + errChain <- err } } + wg.Done() + }() + + go func() { + for _, v := range delEcVolsMap { + v.Destroy() + } + wg.Done() + }() + + go func() { + wg.Wait() + close(errChain) + }() + + errBuilder := strings.Builder{} + for err := range errChain { + errBuilder.WriteString(err.Error()) + errBuilder.WriteString("; ") } + if errBuilder.Len() > 0 { + e = fmt.Errorf(errBuilder.String()) + } + return } -func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { +func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { v, ok := l.volumes[vid] if !ok { return @@ -134,24 +180,16 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) { return } -func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool { - if dirs, err := ioutil.ReadDir(l.Directory); err == nil { - for _, dir := range dirs { - volId, _, err := l.volumeIdFromPath(dir) - if vid == volId && err == nil { - var mutex sync.RWMutex - l.loadExistingVolume(dir, needleMapKind, &mutex) - return true - } - } +func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { + if fileInfo, found := l.LocateVolume(vid); found { + return l.loadExistingVolume(fileInfo, needleMapKind) } - return false } -func (l *DiskLocation) DeleteVolume(vid VolumeId) error { - l.Lock() - defer l.Unlock() +func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { + l.volumesLock.Lock() + defer l.volumesLock.Unlock() _, ok := l.volumes[vid] if !ok { @@ -160,9 +198,9 @@ func (l *DiskLocation) DeleteVolume(vid VolumeId) error { return l.deleteVolumeById(vid) } -func (l *DiskLocation) UnloadVolume(vid VolumeId) error { - l.Lock() - defer l.Unlock() +func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { + l.volumesLock.Lock() + defer l.volumesLock.Unlock() v, ok := l.volumes[vid] if !ok { @@ -173,34 +211,67 @@ func (l *DiskLocation) UnloadVolume(vid VolumeId) error { return nil } -func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) { - l.Lock() - defer l.Unlock() +func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { + deltaVols := make(map[needle.VolumeId]*Volume, 0) + for k, v := range l.volumes { + if v.Collection == collectionName && !v.isCompacting { + deltaVols[k] = v + } + } + + for k := range deltaVols { + delete(l.volumes, k) + } + return deltaVols +} + +func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { + l.volumesLock.Lock() + defer l.volumesLock.Unlock() l.volumes[vid] = volume } -func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) { - l.RLock() - defer l.RUnlock() +func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) { + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() v, ok := l.volumes[vid] return v, ok } func (l *DiskLocation) VolumesLen() int { - l.RLock() - defer l.RUnlock() + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() return len(l.volumes) } func (l *DiskLocation) Close() { - l.Lock() - defer l.Unlock() - + l.volumesLock.Lock() for _, v := range l.volumes { v.Close() } + l.volumesLock.Unlock() + + l.ecVolumesLock.Lock() + for _, ecVolume := range l.ecVolumes { + ecVolume.Close() + } + l.ecVolumesLock.Unlock() + return } + +func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) { + if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { + for _, fileInfo := range fileInfos { + volId, _, err := l.volumeIdFromPath(fileInfo) + if vid == volId && err == nil { + return fileInfo, true + } + } + } + + return nil, false +} |
