diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-12-30 13:05:50 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-12-30 13:05:50 +0800 |
| commit | 70da715d8d917527291b35fb069fac077d17b868 (patch) | |
| tree | b89bad02094cc7131bc2c9f64df13e15f9de9914 /weed/storage/volume.go | |
| parent | 93a7df500ffeed766e395907e860b1733040ff23 (diff) | |
| parent | 09043c8e5a3b43add589344d28d4f57e90c83f70 (diff) | |
| download | seaweedfs-70da715d8d917527291b35fb069fac077d17b868.tar.xz seaweedfs-70da715d8d917527291b35fb069fac077d17b868.zip | |
Merge pull request #4 from chrislusf/master
Syncing to the original repository
Diffstat (limited to 'weed/storage/volume.go')
| -rw-r--r-- | weed/storage/volume.go | 157 |
1 files changed, 111 insertions, 46 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go index a1d9d7e8d..acede66bf 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -2,50 +2,58 @@ package storage import ( "fmt" - - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - - "os" "path" "strconv" "sync" "time" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/glog" ) type Volume struct { - Id needle.VolumeId - dir string - Collection string - dataFile *os.File - nm NeedleMapper - compactingWg sync.WaitGroup - needleMapKind NeedleMapType - readOnly bool - - SuperBlock - - dataFileAccessLock sync.Mutex + Id needle.VolumeId + dir string + Collection string + DataBackend backend.BackendStorageFile + nm NeedleMapper + needleMapKind NeedleMapType + noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + hasRemoteFile bool // if the volume has a remote file + MemoryMapMaxSizeMb uint32 + + super_block.SuperBlock + + dataFileAccessLock sync.RWMutex lastModifiedTsSeconds uint64 //unix time in seconds lastAppendAtNs uint64 //unix time in nanoseconds lastCompactIndexOffset uint64 lastCompactRevision uint16 + + isCompacting bool + + volumeInfo *volume_server_pb.VolumeInfo } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} + v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb} + v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) return } func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) + return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) } func VolumeFileName(dir string, collection string, id int) (fileName string) { @@ -60,38 +68,84 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) { func (v *Volume) FileName() (fileName string) { return VolumeFileName(v.dir, v.Collection, int(v.Id)) } -func (v *Volume) DataFile() *os.File { - return v.dataFile -} func (v *Volume) Version() needle.Version { - return v.SuperBlock.Version() + if v.volumeInfo.Version != 0 { + v.SuperBlock.Version = needle.Version(v.volumeInfo.Version) + } + return v.SuperBlock.Version } func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() - if v.dataFile == nil { + if v.DataBackend == nil { return } - stat, e := v.dataFile.Stat() + datFileSize, modTime, e := v.DataBackend.GetStat() if e == nil { - return uint64(stat.Size()), v.nm.IndexFileSize(), stat.ModTime() + return uint64(datFileSize), v.nm.IndexFileSize(), modTime } - glog.V(0).Infof("Failed to read file size %s %v", v.dataFile.Name(), e) + glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e) return // -1 causes integer overflow and the volume to become unwritable. } -func (v *Volume) IndexFileSize() uint64 { - return v.nm.IndexFileSize() +func (v *Volume) ContentSize() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.ContentSize() +} + +func (v *Volume) DeletedSize() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.DeletedSize() } func (v *Volume) FileCount() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } return uint64(v.nm.FileCount()) } +func (v *Volume) DeletedCount() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return uint64(v.nm.DeletedCount()) +} + +func (v *Volume) MaxFileKey() types.NeedleId { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.MaxFileKey() +} + +func (v *Volume) IndexFileSize() uint64 { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return 0 + } + return v.nm.IndexFileSize() +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -100,9 +154,9 @@ func (v *Volume) Close() { v.nm.Close() v.nm = nil } - if v.dataFile != nil { - _ = v.dataFile.Close() - v.dataFile = nil + if v.DataBackend != nil { + _ = v.DataBackend.Close() + v.DataBackend = nil stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() } } @@ -111,10 +165,6 @@ func (v *Volume) NeedToReplicate() bool { return v.ReplicaPlacement.GetCopyCount() > 1 } -func (v *Volume) ContentSize() uint64 { - return v.nm.ContentSize() -} - // volume is expired if modified time + volume ttl < now // except when volume is empty // or when the volume does not have a ttl @@ -157,18 +207,33 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { size, _, modTime := v.FileStat() - return &master_pb.VolumeInformationMessage{ + + volumInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), Size: size, Collection: v.Collection, - FileCount: uint64(v.nm.FileCount()), - DeleteCount: uint64(v.nm.DeletedCount()), - DeletedByteCount: v.nm.DeletedSize(), - ReadOnly: v.readOnly, + FileCount: v.FileCount(), + DeleteCount: v.DeletedCount(), + DeletedByteCount: v.DeletedSize(), + ReadOnly: v.noWriteOrDelete, ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), } + + volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey() + + return volumInfo +} + +func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { + if v.volumeInfo == nil { + return + } + if len(v.volumeInfo.GetFiles()) == 0 { + return + } + return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey() } |
