diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/disk_location.go | 5 | ||||
| -rw-r--r-- | weed/storage/disk_location_ec.go | 4 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_shard.go | 6 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 6 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume_info.go | 6 | ||||
| -rw-r--r-- | weed/storage/store.go | 47 | ||||
| -rw-r--r-- | weed/storage/store_ec.go | 2 | ||||
| -rw-r--r-- | weed/storage/types/volume_disk_type.go | 33 | ||||
| -rw-r--r-- | weed/storage/volume.go | 5 | ||||
| -rw-r--r-- | weed/storage/volume_checking.go | 16 | ||||
| -rw-r--r-- | weed/storage/volume_info.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 4 |
12 files changed, 112 insertions, 26 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 76eab64a3..6de87c793 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "os" "path/filepath" @@ -19,6 +20,7 @@ import ( type DiskLocation struct { Directory string IdxDirectory string + DiskType types.DiskType MaxVolumeCount int OriginalMaxVolumeCount int MinFreeSpacePercent float32 @@ -32,7 +34,7 @@ type DiskLocation struct { isDiskSpaceLow bool } -func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string) *DiskLocation { +func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation { dir = util.ResolvePath(dir) if idxDir == "" { idxDir = dir @@ -42,6 +44,7 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32 location := &DiskLocation{ Directory: dir, IdxDirectory: idxDir, + DiskType: diskType, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent, diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index d1237b40f..0ab45011c 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -57,7 +57,7 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) { - ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId) + ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId) if err != nil { if err == os.ErrNotExist { return os.ErrNotExist @@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { - ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid) + ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid) if err != nil { return fmt.Errorf("failed to create ec volume %d: %v", vid, err) } diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 74ed99198..2a57d85ef 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -2,6 +2,7 @@ package erasure_coding import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "path" "strconv" @@ -20,11 +21,12 @@ type EcVolumeShard struct { dir string ecdFile *os.File ecdFileSize int64 + DiskType types.DiskType } -func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { +func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { - v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId} + v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType} baseFileName := v.FileName() diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index a9d08ed0e..85d6a5fc8 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -36,10 +36,11 @@ type EcVolume struct { Version needle.Version ecjFile *os.File ecjFileAccessLock sync.Mutex + diskType types.DiskType } -func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { - ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid} +func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { + ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType} dataBaseFileName := EcShardFileName(collection, dir, int(vid)) indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) @@ -191,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V m = &master_pb.VolumeEcShardInformationMessage{ Id: uint32(s.VolumeId), Collection: s.Collection, + DiskType: string(ev.diskType), } messages = append(messages, m) } diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 8ff65bb0f..3dd535e64 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -10,13 +10,15 @@ type EcVolumeInfo struct { VolumeId needle.VolumeId Collection string ShardBits ShardBits + DiskType string } -func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo { +func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo { return &EcVolumeInfo{ Collection: collection, VolumeId: vid, ShardBits: shardBits, + DiskType: diskType, } } @@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { VolumeId: ecInfo.VolumeId, Collection: ecInfo.Collection, ShardBits: ecInfo.ShardBits.Minus(other.ShardBits), + DiskType: ecInfo.DiskType, } return ret @@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb. Id: uint32(ecInfo.VolumeId), EcIndexBits: uint32(ecInfo.ShardBits), Collection: ecInfo.Collection, + DiskType: ecInfo.DiskType, } } diff --git a/weed/storage/store.go b/weed/storage/store.go index bd66e23f4..47829666a 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -52,11 +52,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind) (s *Store) { +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) @@ -69,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -78,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -100,9 +100,12 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume { } return nil } -func (s *Store) FindFreeLocation() (ret *DiskLocation) { +func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { max := 0 for _, location := range s.Locations { + if diskType != location.DiskType { + continue + } currentFreeCount := location.MaxVolumeCount - location.VolumesLen() currentFreeCount *= erasure_coding.DataShardsCount currentFreeCount -= location.EcVolumesLen() @@ -114,11 +117,11 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.FindFreeLocation(); location != nil { + if location := s.FindFreeLocation(diskType); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { @@ -130,6 +133,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind ReplicaPlacement: uint32(replicaPlacement.Byte()), Version: uint32(volume.Version()), Ttl: ttl.ToUint32(), + DiskType: string(diskType), } return nil } else { @@ -169,6 +173,7 @@ func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) { ReadOnly: v.IsReadOnly(), Ttl: v.Ttl, CompactRevision: uint32(v.CompactionRevision), + DiskType: v.DiskType().String(), } s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() @@ -202,13 +207,13 @@ func (s *Store) GetRack() string { func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage - maxVolumeCount := 0 + maxVolumeCounts := make(map[string]uint32) var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId - maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount) location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() @@ -280,15 +285,15 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } return &master_pb.Heartbeat{ - Ip: s.Ip, - Port: uint32(s.Port), - PublicUrl: s.PublicUrl, - MaxVolumeCount: uint32(maxVolumeCount), - MaxFileKey: NeedleIdToUint64(maxFileKey), - DataCenter: s.dataCenter, - Rack: s.rack, - Volumes: volumeMessages, - HasNoVolumes: len(volumeMessages) == 0, + Ip: s.Ip, + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCounts: maxVolumeCounts, + MaxFileKey: NeedleIdToUint64(maxFileKey), + DataCenter: s.dataCenter, + Rack: s.rack, + Volumes: volumeMessages, + HasNoVolumes: len(volumeMessages) == 0, } } @@ -371,6 +376,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } return nil } @@ -390,6 +396,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { @@ -414,6 +421,7 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { if err := location.DeleteVolume(i); err == nil { @@ -463,6 +471,9 @@ func (s *Store) GetVolumeSizeLimit() uint64 { func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { volumeSizeLimit := s.GetVolumeSizeLimit() + if volumeSizeLimit == 0 { + return + } for _, diskLocation := range s.Locations { if diskLocation.OriginalMaxVolumeCount == 0 { currentMaxVolumeCount := diskLocation.MaxVolumeCount diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index ab4e96634..a9b6a8ff3 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -58,6 +58,7 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er Id: uint32(vid), Collection: collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), + DiskType: string(location.DiskType), } return nil } else if err == os.ErrNotExist { @@ -82,6 +83,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar Id: uint32(vid), Collection: ecShard.Collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), + DiskType: string(ecShard.DiskType), } for _, location := range s.Locations { diff --git a/weed/storage/types/volume_disk_type.go b/weed/storage/types/volume_disk_type.go new file mode 100644 index 000000000..25056ee10 --- /dev/null +++ b/weed/storage/types/volume_disk_type.go @@ -0,0 +1,33 @@ +package types + +import ( + "strings" +) + +type DiskType string + +const ( + HardDriveType DiskType = "" + SsdType = "ssd" +) + +func ToDiskType(vt string) (diskType DiskType) { + vt = strings.ToLower(vt) + diskType = HardDriveType + switch vt { + case "", "hdd": + diskType = HardDriveType + case "ssd": + diskType = SsdType + default: + diskType = DiskType(vt) + } + return +} + +func (diskType DiskType) String() string { + if diskType == "" { + return "" + } + return string(diskType) +} diff --git a/weed/storage/volume.go b/weed/storage/volume.go index d86e25885..366449c53 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -171,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 { return v.nm.IndexFileSize() } +func (v *Volume) DiskType() types.DiskType { + return v.location.DiskType +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -262,6 +266,7 @@ func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.Volume Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), + DiskType: string(v.location.DiskType), } volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 8d63c39c1..b76933083 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" "os" @@ -148,3 +149,18 @@ func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.V } return n.AppendAtNs, err } + +func (v *Volume) checkIdxFile() error { + datFileSize, _, err := v.DataBackend.GetStat() + if err != nil { + return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err) + } + if datFileSize <= super_block.SuperBlockSize { + return nil + } + indexFileName := v.FileName(".idx") + if util.FileExists(indexFileName) { + return nil + } + return fmt.Errorf("idx file %s does not exists", indexFileName) +} diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index 313818cde..9c64c9682 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -14,6 +14,7 @@ type VolumeInfo struct { Size uint64 ReplicaPlacement *super_block.ReplicaPlacement Ttl *needle.TTL + DiskType string Collection string Version needle.Version FileCount int @@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er ModifiedAtSecond: m.ModifiedAtSecond, RemoteStorageName: m.RemoteStorageName, RemoteStorageKey: m.RemoteStorageKey, + DiskType: m.DiskType, } rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { @@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu } vi.ReplicaPlacement = rp vi.Ttl = needle.LoadTTLFromUint32(m.Ttl) + vi.DiskType = m.DiskType return vi, nil } @@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe ModifiedAtSecond: vi.ModifiedAtSecond, RemoteStorageName: vi.RemoteStorageName, RemoteStorageKey: vi.RemoteStorageKey, + DiskType: vi.DiskType, } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 52a50a98c..bff1055bb 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -96,6 +96,10 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind v.dirIdx = v.dir } } + // check volume idx files + if err := v.checkIdxFile(); err != nil { + glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err) + } var indexFile *os.File if v.noWriteOrDelete { glog.V(0).Infoln("open to read file", v.FileName(".idx")) |
