aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/disk_location.go11
-rw-r--r--weed/storage/store.go44
-rw-r--r--weed/storage/volume.go7
-rw-r--r--weed/storage/volume_info.go3
-rw-r--r--weed/storage/volume_type.go23
5 files changed, 69 insertions, 19 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 359d0cec2..c6619b260 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -19,6 +19,7 @@ import (
type DiskLocation struct {
Directory string
IdxDirectory string
+ DiskType DiskType
MaxVolumeCount int
OriginalMaxVolumeCount int
MinFreeSpacePercent float32
@@ -32,7 +33,7 @@ type DiskLocation struct {
isDiskSpaceLow bool
}
-func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation {
+func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType DiskType) *DiskLocation {
dir = util.ResolvePath(dir)
if idxDir == "" {
idxDir = dir
@@ -42,6 +43,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
location := &DiskLocation{
Directory: dir,
IdxDirectory: idxDir,
+ DiskType: diskType,
MaxVolumeCount: maxVolumeCount,
OriginalMaxVolumeCount: maxVolumeCount,
MinFreeSpacePercent: minFreeSpacePercent,
@@ -354,3 +356,10 @@ func (l *DiskLocation) CheckDiskSpace() {
}
}
+
+func (l *DiskLocation) GetDiskType() string {
+ if l.DiskType == SsdType {
+ return "SSD"
+ }
+ return "HDD"
+} \ No newline at end of file
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 83c40a01a..8351ecf3b 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -52,11 +52,11 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) {
+func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType, diskTypes []DiskType) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
- location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder)
+ location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i])
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
-func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
+func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil {
return e
}
- e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
+ e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
@@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume {
}
return nil
}
-func (s *Store) FindFreeLocation() (ret *DiskLocation) {
+func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
+ if diskType != location.DiskType {
+ continue
+ }
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen()
@@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
- if location := s.FindFreeLocation(); location != nil {
+ if location := s.FindFreeLocation(diskType); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
@@ -203,12 +206,18 @@ func (s *Store) GetRack() string {
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
+ maxSsdVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
- maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
+ switch location.DiskType {
+ case SsdType:
+ maxSsdVolumeCount = maxSsdVolumeCount + location.MaxVolumeCount
+ case HardDriveType:
+ maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
+ }
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
@@ -280,15 +289,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
return &master_pb.Heartbeat{
- Ip: s.Ip,
- Port: uint32(s.Port),
- PublicUrl: s.PublicUrl,
- MaxVolumeCount: uint32(maxVolumeCount),
- MaxFileKey: NeedleIdToUint64(maxFileKey),
- DataCenter: s.dataCenter,
- Rack: s.rack,
- Volumes: volumeMessages,
- HasNoVolumes: len(volumeMessages) == 0,
+ Ip: s.Ip,
+ Port: uint32(s.Port),
+ PublicUrl: s.PublicUrl,
+ MaxVolumeCount: uint32(maxVolumeCount),
+ MaxSsdVolumeCount: uint32(maxSsdVolumeCount),
+ MaxFileKey: NeedleIdToUint64(maxFileKey),
+ DataCenter: s.dataCenter,
+ Rack: s.rack,
+ Volumes: volumeMessages,
+ HasNoVolumes: len(volumeMessages) == 0,
}
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 7712c5eda..1905a85a5 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -47,7 +47,7 @@ type Volume struct {
volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation
- lastIoError error
+ lastIoError error
}
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
@@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
+func (v *Volume) DiskType() DiskType {
+ return v.location.DiskType
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
+ DiskType: string(v.location.DiskType),
}
volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
index 313818cde..24577618e 100644
--- a/weed/storage/volume_info.go
+++ b/weed/storage/volume_info.go
@@ -14,6 +14,7 @@ type VolumeInfo struct {
Size uint64
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
+ DiskType string
Collection string
Version needle.Version
FileCount int
@@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
ModifiedAtSecond: m.ModifiedAtSecond,
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
+ DiskType: m.DiskType,
}
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
@@ -90,6 +92,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe
ModifiedAtSecond: vi.ModifiedAtSecond,
RemoteStorageName: vi.RemoteStorageName,
RemoteStorageKey: vi.RemoteStorageKey,
+ DiskType: vi.DiskType,
}
}
diff --git a/weed/storage/volume_type.go b/weed/storage/volume_type.go
new file mode 100644
index 000000000..05cd6b2c3
--- /dev/null
+++ b/weed/storage/volume_type.go
@@ -0,0 +1,23 @@
+package storage
+
+import "fmt"
+
+type DiskType string
+
+const (
+ HardDriveType DiskType = ""
+ SsdType = "ssd"
+)
+
+func ToDiskType(vt string) (diskType DiskType, err error) {
+ diskType = HardDriveType
+ switch vt {
+ case "", "hdd":
+ diskType = HardDriveType
+ case "ssd":
+ diskType = SsdType
+ default:
+ err = fmt.Errorf("parse DiskType %s: expecting hdd or ssd\n", vt)
+ }
+ return
+}