diff options
Diffstat (limited to 'weed/storage/store.go')
| -rw-r--r-- | weed/storage/store.go | 65 |
1 files changed, 38 insertions, 27 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go index 69bb5bc3b..2d9707571 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -91,11 +91,12 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) + diskId := uint32(i) // Track disk ID wg.Add(1) - go func() { + go func(id uint32, diskLoc *DiskLocation) { defer wg.Done() - location.loadExistingVolumes(needleMapKind, ldbTimeout) - }() + diskLoc.loadExistingVolumesWithId(needleMapKind, ldbTimeout, id) + }(diskId, location) } wg.Wait() @@ -163,14 +164,25 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.FindFreeLocation(func(location *DiskLocation) bool { - return location.DiskType == diskType - }); location != nil { - glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", - location.Directory, vid, collection, replicaPlacement, ttl) + + // Find location and its index + var location *DiskLocation + var diskId uint32 + for i, loc := range s.Locations { + if loc.DiskType == diskType && s.hasFreeDiskLocation(loc) { + location = loc + diskId = uint32(i) + break + } + } + + if location != nil { + glog.V(0).Infof("In dir %s (disk ID %d) adds volume:%v collection:%s replicaPlacement:%v ttl:%v", + location.Directory, diskId, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, ver, memoryMapMaxSizeMb, ldbTimeout); err == nil { + volume.diskId = diskId // Set the disk ID location.SetVolume(vid, volume) - glog.V(0).Infof("add volume %d", vid) + glog.V(0).Infof("add volume %d on disk ID %d", vid, diskId) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ Id: uint32(vid), Collection: collection, @@ -178,6 +190,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind Version: uint32(volume.Version()), Ttl: ttl.ToUint32(), DiskType: string(diskType), + DiskId: diskId, } return nil } else { @@ -187,6 +200,11 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind return fmt.Errorf("No more free space left") } +// hasFreeDiskLocation checks if a disk location has free space +func (s *Store) hasFreeDiskLocation(location *DiskLocation) bool { + return int64(location.VolumesLen()) < int64(location.MaxVolumeCount) +} + func (s *Store) VolumeInfos() (allStats []*VolumeInfo) { for _, location := range s.Locations { stats := collectStatsForOneLocation(location) @@ -218,21 +236,10 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) { Ttl: v.Ttl, CompactRevision: uint32(v.CompactionRevision), DiskType: v.DiskType().String(), + DiskId: v.diskId, } s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() - - v.dataFileAccessLock.RLock() - defer v.dataFileAccessLock.RUnlock() - - if v.nm == nil { - return - } - - s.FileCount = v.nm.FileCount() - s.DeleteCount = v.nm.DeletedCount() - s.DeletedByteCount = v.nm.DeletedSize() - s.Size = v.nm.ContentSize() - + s.Size, _, _ = v.FileStat() return } @@ -384,7 +391,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeEcShardInformationMessage) { - for _, location := range s.Locations { + for diskId, location := range s.Locations { // Collect ecVolume to be deleted var toDeleteEvs []*erasure_coding.EcVolume location.ecVolumesLock.RLock() @@ -392,7 +399,7 @@ func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeE if ev.IsTimeToDestroy() { toDeleteEvs = append(toDeleteEvs, ev) } else { - messages := ev.ToVolumeEcShardInformationMessage() + messages := ev.ToVolumeEcShardInformationMessage(uint32(diskId)) ecShards = append(ecShards, messages...) } } @@ -400,7 +407,7 @@ func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeE // Delete expired volumes for _, ev := range toDeleteEvs { - messages := ev.ToVolumeEcShardInformationMessage() + messages := ev.ToVolumeEcShardInformationMessage(uint32(diskId)) // deleteEcVolumeById has its own lock err := location.deleteEcVolumeById(ev.VolumeId) if err != nil { @@ -515,10 +522,11 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error { } func (s *Store) MountVolume(i needle.VolumeId) error { - for _, location := range s.Locations { - if found := location.LoadVolume(i, s.NeedleMapKind); found == true { + for diskId, location := range s.Locations { + if found := location.LoadVolume(uint32(diskId), i, s.NeedleMapKind); found == true { glog.V(0).Infof("mount volume %d", i) v := s.findVolume(i) + v.diskId = uint32(diskId) // Set disk ID when mounting s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ Id: uint32(v.Id), Collection: v.Collection, @@ -526,6 +534,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error { Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), DiskType: string(v.location.DiskType), + DiskId: uint32(diskId), } return nil } @@ -546,6 +555,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), DiskType: string(v.location.DiskType), + DiskId: v.diskId, } for _, location := range s.Locations { @@ -574,6 +584,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error { Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), DiskType: string(v.location.DiskType), + DiskId: v.diskId, } for _, location := range s.Locations { err := location.DeleteVolume(i, onlyEmpty) |
