diff options
Diffstat (limited to 'weed/storage/volume.go')
| -rw-r--r-- | weed/storage/volume.go | 106 |
1 files changed, 84 insertions, 22 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go index acede66bf..e0638d8a8 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -21,20 +21,23 @@ import ( type Volume struct { Id needle.VolumeId dir string + dirIdx string Collection string DataBackend backend.BackendStorageFile nm NeedleMapper - needleMapKind NeedleMapType + needleMapKind NeedleMapKind noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + noWriteLock sync.RWMutex hasRemoteFile bool // if the volume has a remote file MemoryMapMaxSizeMb uint32 super_block.SuperBlock dataFileAccessLock sync.RWMutex - lastModifiedTsSeconds uint64 //unix time in seconds - lastAppendAtNs uint64 //unix time in nanoseconds + asyncRequestsChan chan *needle.AsyncRequest + lastModifiedTsSeconds uint64 // unix time in seconds + lastAppendAtNs uint64 // unix time in nanoseconds lastCompactIndexOffset uint64 lastCompactRevision uint16 @@ -42,18 +45,26 @@ type Volume struct { isCompacting bool volumeInfo *volume_server_pb.VolumeInfo + location *DiskLocation + + lastIoError error } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb} + v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, + asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) + v.startWorker() return } + func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) + v.noWriteLock.RLock() + defer v.noWriteLock.RUnlock() + return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) } func VolumeFileName(dir string, collection string, id int) (fileName string) { @@ -65,10 +76,24 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) { } return } -func (v *Volume) FileName() (fileName string) { + +func (v *Volume) DataFileName() (fileName string) { return VolumeFileName(v.dir, v.Collection, int(v.Id)) } +func (v *Volume) IndexFileName() (fileName string) { + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) +} + +func (v *Volume) FileName(ext string) (fileName string) { + switch ext { + case ".idx", ".cpx", ".ldb": + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext + } + // .dat, .cpd, .vif + return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext +} + func (v *Volume) Version() needle.Version { if v.volumeInfo.Version != 0 { v.SuperBlock.Version = needle.Version(v.volumeInfo.Version) @@ -146,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 { return v.nm.IndexFileSize() } +func (v *Volume) DiskType() types.DiskType { + return v.location.DiskType +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -169,20 +198,20 @@ func (v *Volume) NeedToReplicate() bool { // except when volume is empty // or when the volume does not have a ttl // or when volumeSizeLimit is 0 when server just starts -func (v *Volume) expired(volumeSizeLimit uint64) bool { +func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool { if volumeSizeLimit == 0 { - //skip if we don't know size limit + // skip if we don't know size limit return false } - if v.ContentSize() == 0 { + if contentSize <= super_block.SuperBlockSize { return false } if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } - glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) + glog.V(2).Infof("volume %d now:%v lastModified:%v", v.Id, time.Now().Unix(), v.lastModifiedTsSeconds) livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60 - glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + glog.V(2).Infof("volume %d ttl:%v lived:%v", v.Id, v.Ttl, livedMinutes) if int64(v.Ttl.Minutes()) < livedMinutes { return true } @@ -205,27 +234,54 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { return false } -func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { - size, _, modTime := v.FileStat() +func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + glog.V(3).Infof("collectStatus volume %d", v.Id) - volumInfo := &master_pb.VolumeInformationMessage{ + if v.nm == nil { + return + } + + ok = true + + maxFileKey = v.nm.MaxFileKey() + datFileSize, modTime, _ = v.DataBackend.GetStat() + fileCount = uint64(v.nm.FileCount()) + deletedCount = uint64(v.nm.DeletedCount()) + deletedSize = v.nm.DeletedSize() + fileCount = uint64(v.nm.FileCount()) + + return +} + +func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) { + + maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus() + + if !ok { + return 0, nil + } + + volumeInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), - Size: size, + Size: uint64(volumeSize), Collection: v.Collection, - FileCount: v.FileCount(), - DeleteCount: v.DeletedCount(), - DeletedByteCount: v.DeletedSize(), - ReadOnly: v.noWriteOrDelete, + FileCount: fileCount, + DeleteCount: deletedCount, + DeletedByteCount: deletedSize, + ReadOnly: v.IsReadOnly(), ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), + DiskType: string(v.location.DiskType), } - volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey() + volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() - return volumInfo + return maxFileKey, volumeInfo } func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { @@ -237,3 +293,9 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { } return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey() } + +func (v *Volume) IsReadOnly() bool { + v.noWriteLock.RLock() + defer v.noWriteLock.RUnlock() + return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow +} |
