aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume.go')
-rw-r--r--weed/storage/volume.go90
1 files changed, 56 insertions, 34 deletions
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index e85696eab..7da83de7a 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -2,18 +2,19 @@ package storage
import (
"fmt"
+ "path"
+ "strconv"
+ "sync"
+ "time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "path"
- "strconv"
- "sync"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/glog"
)
@@ -21,15 +22,17 @@ type Volume struct {
Id needle.VolumeId
dir string
Collection string
- DataBackend backend.DataStorageBackend
+ DataBackend backend.BackendStorageFile
nm NeedleMapper
needleMapKind NeedleMapType
- readOnly bool
+ noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32
- SuperBlock
+ super_block.SuperBlock
- dataFileAccessLock sync.Mutex
+ dataFileAccessLock sync.RWMutex
lastModifiedTsSeconds uint64 //unix time in seconds
lastAppendAtNs uint64 //unix time in nanoseconds
@@ -37,18 +40,20 @@ type Volume struct {
lastCompactRevision uint16
isCompacting bool
+
+ volumeInfo *volume_server_pb.VolumeInfo
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
+ v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
return
}
func (v *Volume) String() string {
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.readOnly)
+ return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@@ -65,12 +70,15 @@ func (v *Volume) FileName() (fileName string) {
}
func (v *Volume) Version() needle.Version {
- return v.SuperBlock.Version()
+ if v.volumeInfo.Version != 0 {
+ v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
+ }
+ return v.SuperBlock.Version
}
func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.DataBackend == nil {
return
@@ -80,13 +88,13 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)
if e == nil {
return uint64(datFileSize), v.nm.IndexFileSize(), modTime
}
- glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.String(), e)
+ glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e)
return // -1 causes integer overflow and the volume to become unwritable.
}
func (v *Volume) ContentSize() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -94,8 +102,8 @@ func (v *Volume) ContentSize() uint64 {
}
func (v *Volume) DeletedSize() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -103,8 +111,8 @@ func (v *Volume) DeletedSize() uint64 {
}
func (v *Volume) FileCount() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -112,8 +120,8 @@ func (v *Volume) FileCount() uint64 {
}
func (v *Volume) DeletedCount() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -121,8 +129,8 @@ func (v *Volume) DeletedCount() uint64 {
}
func (v *Volume) MaxFileKey() types.NeedleId {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -130,8 +138,8 @@ func (v *Volume) MaxFileKey() types.NeedleId {
}
func (v *Volume) IndexFileSize() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -172,9 +180,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
- glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
+ glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
- glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}
@@ -200,18 +208,32 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
size, _, modTime := v.FileStat()
- return &master_pb.VolumeInformationMessage{
+ volumInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
Size: size,
Collection: v.Collection,
- FileCount: uint64(v.FileCount()),
- DeleteCount: uint64(v.DeletedCount()),
+ FileCount: v.FileCount(),
+ DeleteCount: v.DeletedCount(),
DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.readOnly,
+ ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
}
+
+ volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey()
+
+ return volumInfo
+}
+
+func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
+ if v.volumeInfo == nil {
+ return
+ }
+ if len(v.volumeInfo.GetFiles()) == 0 {
+ return
+ }
+ return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey()
}