aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/store.go')
-rw-r--r--weed/storage/store.go65
1 files changed, 38 insertions, 27 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 69bb5bc3b..2d9707571 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -91,11 +91,12 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
+ diskId := uint32(i) // Track disk ID
wg.Add(1)
- go func() {
+ go func(id uint32, diskLoc *DiskLocation) {
defer wg.Done()
- location.loadExistingVolumes(needleMapKind, ldbTimeout)
- }()
+ diskLoc.loadExistingVolumesWithId(needleMapKind, ldbTimeout, id)
+ }(diskId, location)
}
wg.Wait()
@@ -163,14 +164,25 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
- if location := s.FindFreeLocation(func(location *DiskLocation) bool {
- return location.DiskType == diskType
- }); location != nil {
- glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
- location.Directory, vid, collection, replicaPlacement, ttl)
+
+ // Find location and its index
+ var location *DiskLocation
+ var diskId uint32
+ for i, loc := range s.Locations {
+ if loc.DiskType == diskType && s.hasFreeDiskLocation(loc) {
+ location = loc
+ diskId = uint32(i)
+ break
+ }
+ }
+
+ if location != nil {
+ glog.V(0).Infof("In dir %s (disk ID %d) adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, diskId, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, ver, memoryMapMaxSizeMb, ldbTimeout); err == nil {
+ volume.diskId = diskId // Set the disk ID
location.SetVolume(vid, volume)
- glog.V(0).Infof("add volume %d", vid)
+ glog.V(0).Infof("add volume %d on disk ID %d", vid, diskId)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
Id: uint32(vid),
Collection: collection,
@@ -178,6 +190,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
Version: uint32(volume.Version()),
Ttl: ttl.ToUint32(),
DiskType: string(diskType),
+ DiskId: diskId,
}
return nil
} else {
@@ -187,6 +200,11 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
return fmt.Errorf("No more free space left")
}
+// hasFreeDiskLocation checks if a disk location has free space
+func (s *Store) hasFreeDiskLocation(location *DiskLocation) bool {
+ return int64(location.VolumesLen()) < int64(location.MaxVolumeCount)
+}
+
func (s *Store) VolumeInfos() (allStats []*VolumeInfo) {
for _, location := range s.Locations {
stats := collectStatsForOneLocation(location)
@@ -218,21 +236,10 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) {
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
DiskType: v.DiskType().String(),
+ DiskId: v.diskId,
}
s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
-
- v.dataFileAccessLock.RLock()
- defer v.dataFileAccessLock.RUnlock()
-
- if v.nm == nil {
- return
- }
-
- s.FileCount = v.nm.FileCount()
- s.DeleteCount = v.nm.DeletedCount()
- s.DeletedByteCount = v.nm.DeletedSize()
- s.Size = v.nm.ContentSize()
-
+ s.Size, _, _ = v.FileStat()
return
}
@@ -384,7 +391,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeEcShardInformationMessage) {
- for _, location := range s.Locations {
+ for diskId, location := range s.Locations {
// Collect ecVolume to be deleted
var toDeleteEvs []*erasure_coding.EcVolume
location.ecVolumesLock.RLock()
@@ -392,7 +399,7 @@ func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeE
if ev.IsTimeToDestroy() {
toDeleteEvs = append(toDeleteEvs, ev)
} else {
- messages := ev.ToVolumeEcShardInformationMessage()
+ messages := ev.ToVolumeEcShardInformationMessage(uint32(diskId))
ecShards = append(ecShards, messages...)
}
}
@@ -400,7 +407,7 @@ func (s *Store) deleteExpiredEcVolumes() (ecShards, deleted []*master_pb.VolumeE
// Delete expired volumes
for _, ev := range toDeleteEvs {
- messages := ev.ToVolumeEcShardInformationMessage()
+ messages := ev.ToVolumeEcShardInformationMessage(uint32(diskId))
// deleteEcVolumeById has its own lock
err := location.deleteEcVolumeById(ev.VolumeId)
if err != nil {
@@ -515,10 +522,11 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
}
func (s *Store) MountVolume(i needle.VolumeId) error {
- for _, location := range s.Locations {
- if found := location.LoadVolume(i, s.NeedleMapKind); found == true {
+ for diskId, location := range s.Locations {
+ if found := location.LoadVolume(uint32(diskId), i, s.NeedleMapKind); found == true {
glog.V(0).Infof("mount volume %d", i)
v := s.findVolume(i)
+ v.diskId = uint32(diskId) // Set disk ID when mounting
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
Collection: v.Collection,
@@ -526,6 +534,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error {
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
+ DiskId: uint32(diskId),
}
return nil
}
@@ -546,6 +555,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
+ DiskId: v.diskId,
}
for _, location := range s.Locations {
@@ -574,6 +584,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId, onlyEmpty bool) error {
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
DiskType: string(v.location.DiskType),
+ DiskId: v.diskId,
}
for _, location := range s.Locations {
err := location.DeleteVolume(i, onlyEmpty)