diff options
Diffstat (limited to 'weed/storage/store.go')
| -rw-r--r-- | weed/storage/store.go | 53 |
1 files changed, 32 insertions, 21 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go index ff28be47c..47829666a 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -40,7 +40,7 @@ type Store struct { dataCenter string // optional informaton, overwriting master setting if exists rack string // optional information, overwriting master setting if exists connected bool - NeedleMapType NeedleMapType + NeedleMapKind NeedleMapKind NewVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage @@ -52,11 +52,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) { - s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { + s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) @@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume { } return nil } -func (s *Store) FindFreeLocation() (ret *DiskLocation) { +func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { max := 0 for _, location := range s.Locations { + if diskType != location.DiskType { + continue + } currentFreeCount := location.MaxVolumeCount - location.VolumesLen() currentFreeCount *= erasure_coding.DataShardsCount currentFreeCount -= location.EcVolumesLen() @@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.FindFreeLocation(); location != nil { + if location := s.FindFreeLocation(diskType); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { @@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind ReplicaPlacement: uint32(replicaPlacement.Byte()), Version: uint32(volume.Version()), Ttl: ttl.ToUint32(), + DiskType: string(diskType), } return nil } else { @@ -169,6 +173,7 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) { ReadOnly: v.IsReadOnly(), Ttl: v.Ttl, CompactRevision: uint32(v.CompactionRevision), + DiskType: v.DiskType().String(), } s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() @@ -202,13 +207,13 @@ func (s *Store) GetRack() string { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage - maxVolumeCount := 0 + maxVolumeCounts := make(map[string]uint32) var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId - maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount) location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() @@ -280,15 +285,15 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } return &master_pb.Heartbeat{ - Ip: s.Ip, - Port: uint32(s.Port), - PublicUrl: s.PublicUrl, - MaxVolumeCount: uint32(maxVolumeCount), - MaxFileKey: NeedleIdToUint64(maxFileKey), - DataCenter: s.dataCenter, - Rack: s.rack, - Volumes: volumeMessages, - HasNoVolumes: len(volumeMessages) == 0, + Ip: s.Ip, + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCounts: maxVolumeCounts, + MaxFileKey: NeedleIdToUint64(maxFileKey), + DataCenter: s.dataCenter, + Rack: s.rack, + Volumes: volumeMessages, + HasNoVolumes: len(volumeMessages) == 0, } } @@ -362,7 +367,7 @@ func (s *Store) MarkVolumeWritable(i needle.VolumeId) error { func (s *Store) MountVolume(i needle.VolumeId) error { for _, location := range s.Locations { - if found := location.LoadVolume(i, s.NeedleMapType); found == true { + if found := location.LoadVolume(i, s.NeedleMapKind); found == true { glog.V(0).Infof("mount volume %d", i) v := s.findVolume(i) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ @@ -371,6 +376,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } return nil } @@ -390,6 +396,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { @@ -414,6 +421,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { if err := location.DeleteVolume(i); err == nil { @@ -463,6 +471,9 @@ func (s *Store) GetVolumeSizeLimit() uint64 { func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { volumeSizeLimit := s.GetVolumeSizeLimit() + if volumeSizeLimit == 0 { + return + } for _, diskLocation := range s.Locations { if diskLocation.OriginalMaxVolumeCount == 0 { currentMaxVolumeCount := diskLocation.MaxVolumeCount |
