aboutsummaryrefslogtreecommitdiff
path: root/go/storage/volume.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-09-20 12:38:59 -0700
committerChris Lu <chris.lu@gmail.com>2014-09-20 12:38:59 -0700
commitb9aee2defbc2f5aafbc3ea049fbe2ab5f3320999 (patch)
tree719442dc72cc30958e54e4f7e59076796b6775e9 /go/storage/volume.go
parenta092794804b2f7cbd656e439305d29bfa96ad2b9 (diff)
downloadseaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.tar.xz
seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.zip
add TTL support
The volume TTL and file TTL are not necessarily the same. as long as file TTL is smaller than volume TTL, it'll be fine. volume TTL is used when assigning file id, e.g. http://.../dir/assign?ttl=3h file TTL is used when uploading
Diffstat (limited to 'go/storage/volume.go')
-rw-r--r--go/storage/volume.go74
1 files changed, 68 insertions, 6 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go
index dec560545..34ae7e386 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -22,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}