diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/disk_location.go | 11 | ||||
| -rw-r--r-- | weed/storage/store.go | 48 | ||||
| -rw-r--r-- | weed/storage/volume.go | 7 | ||||
| -rw-r--r-- | weed/storage/volume_info.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 8 | ||||
| -rw-r--r-- | weed/storage/volume_type.go | 23 |
6 files changed, 78 insertions, 23 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 9b2ab69fe..ce42232a7 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -19,6 +19,7 @@ import ( type DiskLocation struct { Directory string IdxDirectory string + DiskType DiskType MaxVolumeCount int OriginalMaxVolumeCount int MinFreeSpacePercent float32 @@ -32,7 +33,7 @@ type DiskLocation struct { isDiskSpaceLow bool } -func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation { +func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType DiskType) *DiskLocation { dir = util.ResolvePath(dir) if idxDir == "" { idxDir = dir @@ -42,6 +43,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32 location := &DiskLocation{ Directory: dir, IdxDirectory: idxDir, + DiskType: diskType, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent, @@ -371,3 +373,10 @@ func (l *DiskLocation) CheckDiskSpace() { } } + +func (l *DiskLocation) GetDiskType() string { + if l.DiskType == SsdType { + return "SSD" + } + return "HDD" +} diff --git a/weed/storage/store.go b/weed/storage/store.go index ff28be47c..470ce0c18 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -52,11 +52,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType) (s *Store) { +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapType, diskTypes []DiskType) (s *Store) { s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) @@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume { } return nil } -func (s *Store) FindFreeLocation() (ret *DiskLocation) { +func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { max := 0 for _, location := range s.Locations { + if diskType != location.DiskType { + continue + } currentFreeCount := location.MaxVolumeCount - location.VolumesLen() currentFreeCount *= erasure_coding.DataShardsCount currentFreeCount -= location.EcVolumesLen() @@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.FindFreeLocation(); location != nil { + if location := s.FindFreeLocation(diskType); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { @@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind ReplicaPlacement: uint32(replicaPlacement.Byte()), Version: uint32(volume.Version()), Ttl: ttl.ToUint32(), + DiskType: string(diskType), } return nil } else { @@ -203,12 +207,18 @@ func (s *Store) GetRack() string { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage maxVolumeCount := 0 + maxSsdVolumeCount := 0 var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId - maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + switch location.DiskType { + case SsdType: + maxSsdVolumeCount = maxSsdVolumeCount + location.MaxVolumeCount + case HardDriveType: + maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + } location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() @@ -280,15 +290,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } return &master_pb.Heartbeat{ - Ip: s.Ip, - Port: uint32(s.Port), - PublicUrl: s.PublicUrl, - MaxVolumeCount: uint32(maxVolumeCount), - MaxFileKey: NeedleIdToUint64(maxFileKey), - DataCenter: s.dataCenter, - Rack: s.rack, - Volumes: volumeMessages, - HasNoVolumes: len(volumeMessages) == 0, + Ip: s.Ip, + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCount: uint32(maxVolumeCount), + MaxSsdVolumeCount: uint32(maxSsdVolumeCount), + MaxFileKey: NeedleIdToUint64(maxFileKey), + DataCenter: s.dataCenter, + Rack: s.rack, + Volumes: volumeMessages, + HasNoVolumes: len(volumeMessages) == 0, } } @@ -371,6 +382,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } return nil } @@ -390,6 +402,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { @@ -414,6 +427,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { if err := location.DeleteVolume(i); err == nil { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 7712c5eda..1905a85a5 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -47,7 +47,7 @@ type Volume struct { volumeInfo *volume_server_pb.VolumeInfo location *DiskLocation - lastIoError error + lastIoError error } func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { @@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 { return v.nm.IndexFileSize() } +func (v *Volume) DiskType() DiskType { + return v.location.DiskType +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), + DiskType: string(v.location.DiskType), } volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index 313818cde..9c64c9682 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -14,6 +14,7 @@ type VolumeInfo struct { Size uint64 ReplicaPlacement *super_block.ReplicaPlacement Ttl *needle.TTL + DiskType string Collection string Version needle.Version FileCount int @@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er ModifiedAtSecond: m.ModifiedAtSecond, RemoteStorageName: m.RemoteStorageName, RemoteStorageKey: m.RemoteStorageKey, + DiskType: m.DiskType, } rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { @@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu } vi.ReplicaPlacement = rp vi.Ttl = needle.LoadTTLFromUint32(m.Ttl) + vi.DiskType = m.DiskType return vi, nil } @@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe ModifiedAtSecond: vi.ModifiedAtSecond, RemoteStorageName: vi.RemoteStorageName, RemoteStorageKey: vi.RemoteStorageKey, + DiskType: vi.DiskType, } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 34eee876d..a6efc630d 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -92,7 +92,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if err == nil && alsoLoadIndex { // adjust for existing volumes with .idx together with .dat files if v.dirIdx != v.dir { - if util.FileExists(v.DataFileName()+".idx") { + if util.FileExists(v.DataFileName() + ".idx") { v.dirIdx = v.dir } } @@ -100,12 +100,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if v.noWriteOrDelete { glog.V(0).Infoln("open to read file", v.FileName(".idx")) if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil { - return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err) + return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err) } } else { glog.V(1).Infoln("open to write file", v.FileName(".idx")) if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil { - return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err) + return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err) } } if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { @@ -115,7 +115,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if v.noWriteOrDelete || v.noWriteCanDelete { if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil { - glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) + glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) } } else { switch needleMapKind { diff --git a/weed/storage/volume_type.go b/weed/storage/volume_type.go new file mode 100644 index 000000000..3e8d5ffb5 --- /dev/null +++ b/weed/storage/volume_type.go @@ -0,0 +1,23 @@ +package storage + +import "fmt" + +type DiskType string + +const ( + HardDriveType DiskType = "" + SsdType = "ssd" +) + +func ToDiskType(vt string) (diskType DiskType, err error) { + diskType = HardDriveType + switch vt { + case "", "hdd": + diskType = HardDriveType + case "ssd": + diskType = SsdType + default: + err = fmt.Errorf("parse DiskType %s: expecting hdd or ssd\n", vt) + } + return +} |
