aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume.go')
-rw-r--r--weed/storage/volume.go106
1 files changed, 84 insertions, 22 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index acede66bf..e0638d8a8 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -21,20 +21,23 @@ import (
type Volume struct {
Id needle.VolumeId
dir string
+ dirIdx string
Collection string
DataBackend backend.BackendStorageFile
nm NeedleMapper
- needleMapKind NeedleMapType
+ needleMapKind NeedleMapKind
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ noWriteLock sync.RWMutex
hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32
super_block.SuperBlock
dataFileAccessLock sync.RWMutex
- lastModifiedTsSeconds uint64 //unix time in seconds
- lastAppendAtNs uint64 //unix time in nanoseconds
+ asyncRequestsChan chan *needle.AsyncRequest
+ lastModifiedTsSeconds uint64 // unix time in seconds
+ lastAppendAtNs uint64 // unix time in nanoseconds
lastCompactIndexOffset uint64
lastCompactRevision uint16
@@ -42,18 +45,26 @@ type Volume struct {
isCompacting bool
volumeInfo *volume_server_pb.VolumeInfo
+ location *DiskLocation
+
+ lastIoError error
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
- v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
+ v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
+ asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
+ v.startWorker()
return
}
+
func (v *Volume) String() string {
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
+ v.noWriteLock.RLock()
+ defer v.noWriteLock.RUnlock()
+ return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@@ -65,10 +76,24 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) {
}
return
}
-func (v *Volume) FileName() (fileName string) {
+
+func (v *Volume) DataFileName() (fileName string) {
return VolumeFileName(v.dir, v.Collection, int(v.Id))
}
+func (v *Volume) IndexFileName() (fileName string) {
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))
+}
+
+func (v *Volume) FileName(ext string) (fileName string) {
+ switch ext {
+ case ".idx", ".cpx", ".ldb":
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext
+ }
+ // .dat, .cpd, .vif
+ return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext
+}
+
func (v *Volume) Version() needle.Version {
if v.volumeInfo.Version != 0 {
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
@@ -146,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
+func (v *Volume) DiskType() types.DiskType {
+ return v.location.DiskType
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@@ -169,20 +198,20 @@ func (v *Volume) NeedToReplicate() bool {
// except when volume is empty
// or when the volume does not have a ttl
// or when volumeSizeLimit is 0 when server just starts
-func (v *Volume) expired(volumeSizeLimit uint64) bool {
+func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool {
if volumeSizeLimit == 0 {
- //skip if we don't know size limit
+ // skip if we don't know size limit
return false
}
- if v.ContentSize() == 0 {
+ if contentSize <= super_block.SuperBlockSize {
return false
}
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
- glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
+ glog.V(2).Infof("volume %d now:%v lastModified:%v", v.Id, time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
- glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ glog.V(2).Infof("volume %d ttl:%v lived:%v", v.Id, v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}
@@ -205,27 +234,54 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
return false
}
-func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
- size, _, modTime := v.FileStat()
+func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ glog.V(3).Infof("collectStatus volume %d", v.Id)
- volumInfo := &master_pb.VolumeInformationMessage{
+ if v.nm == nil {
+ return
+ }
+
+ ok = true
+
+ maxFileKey = v.nm.MaxFileKey()
+ datFileSize, modTime, _ = v.DataBackend.GetStat()
+ fileCount = uint64(v.nm.FileCount())
+ deletedCount = uint64(v.nm.DeletedCount())
+ deletedSize = v.nm.DeletedSize()
+ fileCount = uint64(v.nm.FileCount())
+
+ return
+}
+
+func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) {
+
+ maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus()
+
+ if !ok {
+ return 0, nil
+ }
+
+ volumeInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
- Size: size,
+ Size: uint64(volumeSize),
Collection: v.Collection,
- FileCount: v.FileCount(),
- DeleteCount: v.DeletedCount(),
- DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.noWriteOrDelete,
+ FileCount: fileCount,
+ DeleteCount: deletedCount,
+ DeletedByteCount: deletedSize,
+ ReadOnly: v.IsReadOnly(),
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
+ DiskType: string(v.location.DiskType),
}
- volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey()
+ volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()
- return volumInfo
+ return maxFileKey, volumeInfo
}
func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
@@ -237,3 +293,9 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
}
return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey()
}
+
+func (v *Volume) IsReadOnly() bool {
+ v.noWriteLock.RLock()
+ defer v.noWriteLock.RUnlock()
+ return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
+}