aboutsummaryrefslogtreecommitdiff
path: root/go/storage/volume.go
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2014-10-21 15:50:48 +0800
committeryourchanges <yourchanges@gmail.com>2014-10-21 15:50:48 +0800
commitf7bcd8e958ef185baeca0c455a397d49fcb62256 (patch)
treebb2a2de4fcb7f9ac7e1d65a78e82cbe1f5f17e36 /go/storage/volume.go
parent78ccbbf3d0766e1ececf91fa00766d1ed33050cc (diff)
parenta3c17f17b144c4298409fddda2d6bc4b39d38ca5 (diff)
downloadseaweedfs-f7bcd8e958ef185baeca0c455a397d49fcb62256.tar.xz
seaweedfs-f7bcd8e958ef185baeca0c455a397d49fcb62256.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'go/storage/volume.go')
-rw-r--r--go/storage/volume.go132
1 files changed, 70 insertions, 62 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 7bd8e7467..de79e9107 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -2,7 +2,7 @@ package storage
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"errors"
"fmt"
"io"
@@ -12,22 +12,6 @@ import (
"time"
)
-const (
- SuperBlockSize = 8
-)
-
-type SuperBlock struct {
- Version Version
- ReplicaPlacement *ReplicaPlacement
-}
-
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, SuperBlockSize)
- header[0] = byte(s.Version)
- header[1] = s.ReplicaPlacement.Byte()
- return header
-}
-
type Volume struct {
Id VolumeId
dir string
@@ -38,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -65,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -122,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return e
}
func (v *Volume) Version() Version {
- return v.SuperBlock.Version
+ return v.SuperBlock.Version()
}
func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat()
@@ -138,44 +124,6 @@ func (v *Volume) Close() {
v.nm.Close()
_ = v.dataFile.Close()
}
-func (v *Volume) maybeWriteSuperBlock() error {
- stat, e := v.dataFile.Stat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
- return e
- }
- if stat.Size() == 0 {
- v.SuperBlock.Version = CurrentVersion
- _, e = v.dataFile.Write(v.SuperBlock.Bytes())
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
- if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
- v.readOnly = false
- }
- }
- }
- }
- return e
-}
-func (v *Volume) readSuperBlock() (err error) {
- if _, err = v.dataFile.Seek(0, 0); err != nil {
- return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
- }
- header := make([]byte, SuperBlockSize)
- if _, e := v.dataFile.Read(header); e != nil {
- return fmt.Errorf("cannot read superblock: %s", e.Error())
- }
- v.SuperBlock, err = ParseSuperBlock(header)
- return err
-}
-func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
- superBlock.Version = Version(header[0])
- if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err.Error())
- }
- return
-}
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
@@ -246,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -275,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -397,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}