diff options
Diffstat (limited to 'weed/storage/volume.go')
| -rw-r--r-- | weed/storage/volume.go | 207 |
1 files changed, 160 insertions, 47 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 07c72ecb4..73fdb417d 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -2,75 +2,154 @@ package storage import ( "fmt" - "os" "path" + "strconv" "sync" "time" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/glog" ) type Volume struct { - Id VolumeId - dir string - Collection string - dataFile *os.File - nm NeedleMapper - compactingWg sync.WaitGroup - needleMapKind NeedleMapType - readOnly bool - - SuperBlock - - dataFileAccessLock sync.Mutex - lastModifiedTime uint64 //unix time in seconds + Id needle.VolumeId + dir string + Collection string + DataBackend backend.BackendStorageFile + nm NeedleMapper + needleMapKind NeedleMapType + noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + hasRemoteFile bool // if the volume has a remote file + MemoryMapMaxSizeMb uint32 + + super_block.SuperBlock + + dataFileAccessLock sync.RWMutex + asyncRequestsChan chan *needle.AsyncRequest + lastModifiedTsSeconds uint64 // unix time in seconds + lastAppendAtNs uint64 // unix time in nanoseconds lastCompactIndexOffset uint64 lastCompactRevision uint16 + + isCompacting bool + + volumeInfo *volume_server_pb.VolumeInfo + location *DiskLocation } -func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL, preallocate int64) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} + v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, + asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} + v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) + v.startWorker() return } + func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) + return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) } -func (v *Volume) FileName() (fileName string) { - if v.Collection == "" { - fileName = path.Join(v.dir, v.Id.String()) +func VolumeFileName(dir string, collection string, id int) (fileName string) { + idString := strconv.Itoa(id) + if collection == "" { + fileName = path.Join(dir, idString) } else { - fileName = path.Join(v.dir, v.Collection+"_"+v.Id.String()) + fileName = path.Join(dir, collection+"_"+idString) } return } -func (v *Volume) DataFile() *os.File { - return v.dataFile + +func (v *Volume) FileName() (fileName string) { + return VolumeFileName(v.dir, v.Collection, int(v.Id)) } -func (v *Volume) Version() Version { - return v.SuperBlock.Version() +func (v *Volume) Version() needle.Version { + if v.volumeInfo.Version != 0 { + v.SuperBlock.Version = needle.Version(v.volumeInfo.Version) + } + return v.SuperBlock.Version } -func (v *Volume) Size() int64 { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() +func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() - if v.dataFile == nil { - return 0 + if v.DataBackend == nil { + return } - stat, e := v.dataFile.Stat() + datFileSize, modTime, e := v.DataBackend.GetStat() if e == nil { - return stat.Size() + return uint64(datFileSize), v.nm.IndexFileSize(), modTime + } + glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e) + return // -1 causes integer overflow and the volume to become unwritable. +} + +func (v *Volume) ContentSize() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.ContentSize() +} + +func (v *Volume) DeletedSize() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.DeletedSize() +} + +func (v *Volume) FileCount() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 } - glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) - return 0 // -1 causes integer overflow and the volume to become unwritable. + return uint64(v.nm.FileCount()) +} + +func (v *Volume) DeletedCount() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return uint64(v.nm.DeletedCount()) +} + +func (v *Volume) MaxFileKey() types.NeedleId { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.MaxFileKey() +} + +func (v *Volume) IndexFileSize() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.IndexFileSize() } // Close cleanly shuts down this volume @@ -81,9 +160,10 @@ func (v *Volume) Close() { v.nm.Close() v.nm = nil } - if v.dataFile != nil { - _ = v.dataFile.Close() - v.dataFile = nil + if v.DataBackend != nil { + _ = v.DataBackend.Close() + v.DataBackend = nil + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() } } @@ -91,17 +171,13 @@ func (v *Volume) NeedToReplicate() bool { return v.ReplicaPlacement.GetCopyCount() > 1 } -func (v *Volume) ContentSize() uint64 { - return v.nm.ContentSize() -} - // volume is expired if modified time + volume ttl < now // except when volume is empty // or when the volume does not have a ttl // or when volumeSizeLimit is 0 when server just starts func (v *Volume) expired(volumeSizeLimit uint64) bool { if volumeSizeLimit == 0 { - //skip if we don't know size limit + // skip if we don't know size limit return false } if v.ContentSize() == 0 { @@ -110,9 +186,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool { if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } - glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) - livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 - glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60 + glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) if int64(v.Ttl.Minutes()) < livedMinutes { return true } @@ -129,8 +205,45 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { removalDelay = maxDelayMinutes } - if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTsSeconds < uint64(time.Now().Unix()) { return true } return false } + +func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { + size, _, modTime := v.FileStat() + + volumInfo := &master_pb.VolumeInformationMessage{ + Id: uint32(v.Id), + Size: size, + Collection: v.Collection, + FileCount: v.FileCount(), + DeleteCount: v.DeletedCount(), + DeletedByteCount: v.DeletedSize(), + ReadOnly: v.IsReadOnly(), + ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), + Version: uint32(v.Version()), + Ttl: v.Ttl.ToUint32(), + CompactRevision: uint32(v.SuperBlock.CompactionRevision), + ModifiedAtSecond: modTime.Unix(), + } + + volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey() + + return volumInfo +} + +func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { + if v.volumeInfo == nil { + return + } + if len(v.volumeInfo.GetFiles()) == 0 { + return + } + return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey() +} + +func (v *Volume) IsReadOnly() bool { + return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow +} |
