aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-09-20 12:38:59 -0700
committerChris Lu <chris.lu@gmail.com>2014-09-20 12:38:59 -0700
commitb9aee2defbc2f5aafbc3ea049fbe2ab5f3320999 (patch)
tree719442dc72cc30958e54e4f7e59076796b6775e9 /go/storage
parenta092794804b2f7cbd656e439305d29bfa96ad2b9 (diff)
downloadseaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.tar.xz
seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.zip
add TTL support
The volume TTL and file TTL are not necessarily the same. as long as file TTL is smaller than volume TTL, it'll be fine. volume TTL is used when assigning file id, e.g. http://.../dir/assign?ttl=3h file TTL is used when uploading
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/needle.go9
-rw-r--r--go/storage/needle_read_write.go21
-rw-r--r--go/storage/replica_placement.go8
-rw-r--r--go/storage/store.go67
-rw-r--r--go/storage/volume.go74
-rw-r--r--go/storage/volume_info.go2
-rw-r--r--go/storage/volume_super_block.go10
-rw-r--r--go/storage/volume_super_block_test.go7
-rw-r--r--go/storage/volume_ttl.go147
-rw-r--r--go/storage/volume_vacuum.go14
10 files changed, 311 insertions, 48 deletions
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 77aa70169..3bf627141 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -38,12 +38,13 @@ type Needle struct {
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
+ Ttl *TTL
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
}
-func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) {
+func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) {
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
fileName = fileName[:len(fileName)-3]
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
fname, mimeType, isGzipped := "", "", false
n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r)
+ fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r)
if e != nil {
return
}
@@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
+ if n.Ttl != nil {
+ n.SetHasTtl()
+ }
if fixJpgOrientation {
loweredName := strings.ToLower(fname)
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 835d7c270..848121ff2 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -14,7 +14,9 @@ const (
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
+ FlagHasTtl = 0x10
LastModifiedBytesLength = 5
+ TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
@@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
}
+ if n.HasTtl() {
+ n.Size = n.Size + TtlBytesLength
+ }
}
size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
@@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
+ if n.HasTtl() {
+ n.Ttl.ToBytes(header[0:TtlBytesLength])
+ if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
+ return
+ }
+ }
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
}
+ if index < lenBytes && n.HasTtl() {
+ n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
+ index = index + TtlBytesLength
+ }
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool {
func (n *Needle) SetHasLastModifiedDate() {
n.Flags = n.Flags | FlagHasLastModifiedDate
}
+func (n *Needle) HasTtl() bool {
+ return n.Flags&FlagHasTtl > 0
+}
+func (n *Needle) SetHasTtl() {
+ n.Flags = n.Flags | FlagHasTtl
+}
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
index 696888cd8..c1aca52eb 100644
--- a/go/storage/replica_placement.go
+++ b/go/storage/replica_placement.go
@@ -5,10 +5,6 @@ import (
"fmt"
)
-const (
- ReplicaPlacementCount = 9
-)
-
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
@@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
-
-func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
- return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index a6a4f399e..ef38ade98 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -14,6 +14,10 @@ import (
"strings"
)
+const (
+ MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
+)
+
type DiskLocation struct {
Directory string
MaxVolumeCount int
@@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
+ ttl, e := ReadTTL(ttlString)
+ if e != nil {
+ return e
+ }
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
@@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt)
+ e = s.addVolume(VolumeId(id), collection, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
e = err
}
}
@@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
+func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(volumes, v.Id)
+ return
+}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
@@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil {
+ glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, vid, collection, replicaPlacement, ttl)
+ if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
}
}
@@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) {
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- }
- volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
+ if !v.expired(s.volumeSizeLimit) {
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
+ } else {
+ if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
+ s.DeleteVolume(location.volumes, v)
+ glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ } else {
+ glog.V(0).Infoln("volume", v.Id, "is expired.")
+ }
+ }
}
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index dec560545..34ae7e386 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -22,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 165af1a19..6a954f743 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -8,6 +8,7 @@ type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
Collection string
Version Version
FileCount int
@@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
return vi, e
}
vi.ReplicaPlacement = rp
+ vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go
index 35030b93e..3fbef44d6 100644
--- a/go/storage/volume_super_block.go
+++ b/go/storage/volume_super_block.go
@@ -2,7 +2,6 @@ package storage
import (
"code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
"fmt"
"os"
)
@@ -15,12 +14,13 @@ const (
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
-* Byte 2 and byte 3: Time to live in minutes
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Rest bytes: Reserved
*/
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
- Ttl uint16
+ Ttl *TTL
}
func (s *SuperBlock) Version() Version {
@@ -30,7 +30,7 @@ func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
- util.Uint16toBytes(header[2:4], s.Ttl)
+ s.Ttl.ToBytes(header[2:4])
return header
}
@@ -70,6 +70,6 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
- superBlock.Ttl = util.BytesToUint16(header[2:4])
+ superBlock.Ttl = LoadTTLFromBytes(header[2:4])
return
}
diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go
index 19a1bb757..13db4b194 100644
--- a/go/storage/volume_super_block_test.go
+++ b/go/storage/volume_super_block_test.go
@@ -6,16 +6,17 @@ import (
func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
+ ttl, _ := ReadTTL("15d")
s := &SuperBlock{
version: CurrentVersion,
ReplicaPlacement: rp,
- Ttl: uint16(35),
+ Ttl: ttl,
}
bytes := s.Bytes()
- if !(bytes[2] == 0 && bytes[3] == 35) {
- println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
+ if !(bytes[2] == 15 && bytes[3] == Day) {
+ println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
t.Fail()
}
diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go
new file mode 100644
index 000000000..5ff43e24e
--- /dev/null
+++ b/go/storage/volume_ttl.go
@@ -0,0 +1,147 @@
+package storage
+
+import (
+ "strconv"
+)
+
+var (
+ TtlRange []uint16
+)
+
+const (
+ //stored unit types
+ Empty byte = iota
+ Minute
+ Hour
+ Day
+ Week
+ Month
+ Year
+)
+
+func init() {
+ TtlRange = []uint16{3, 10, 30,
+ 60 /*1 hour*/, 60 * 3, 60 * 6, 60 * 12,
+ 1440 /*1 day*/, 1440 * 3, 1440 * 7, 1440 * 15, 1440 * 31,
+ 44888 /*1 month*/, 65535,
+ }
+}
+
+type TTL struct {
+ count byte
+ unit byte
+}
+
+var EMPTY_TTL = &TTL{}
+
+// translate a readable ttl to internal ttl
+// Supports format example:
+// 3m: 3 minutes
+// 4h: 4 hours
+// 5d: 5 days
+// 6w: 6 weeks
+// 7M: 7 months
+// 8y: 8 years
+func ReadTTL(ttlString string) (*TTL, error) {
+ if ttlString == "" {
+ return EMPTY_TTL, nil
+ }
+ ttlBytes := []byte(ttlString)
+ unitByte := ttlBytes[len(ttlBytes)-1]
+ countBytes := ttlBytes[0 : len(ttlBytes)-1]
+ if '0' <= unitByte && unitByte <= '9' {
+ countBytes = ttlBytes
+ unitByte = 'm'
+ }
+ count, err := strconv.Atoi(string(countBytes))
+ unit := toStoredByte(unitByte)
+ return &TTL{count: byte(count), unit: unit}, err
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromBytes(input []byte) (t *TTL) {
+ return &TTL{count: input[0], unit: input[1]}
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromUint32(ttl uint32) (t *TTL) {
+ input := make([]byte, 2)
+ input[1] = byte(ttl)
+ input[0] = byte(ttl >> 8)
+ return LoadTTLFromBytes(input)
+}
+
+// save stored bytes to an output with 2 bytes
+func (t TTL) ToBytes(output []byte) {
+ output[0] = t.count
+ output[1] = t.unit
+}
+
+func (t TTL) ToUint32() (output uint32) {
+ output = uint32(t.count) << 8
+ output += uint32(t.unit)
+ return output
+}
+
+func (t TTL) String() string {
+ if t.count == 0 {
+ return ""
+ }
+ if t.unit == Empty {
+ return ""
+ }
+ countString := strconv.Itoa(int(t.count))
+ switch t.unit {
+ case Minute:
+ return countString + "m"
+ case Hour:
+ return countString + "h"
+ case Day:
+ return countString + "d"
+ case Week:
+ return countString + "w"
+ case Month:
+ return countString + "M"
+ case Year:
+ return countString + "y"
+ }
+ return ""
+}
+
+func toStoredByte(readableUnitByte byte) byte {
+ switch readableUnitByte {
+ case 'm':
+ return Minute
+ case 'h':
+ return Hour
+ case 'd':
+ return Day
+ case 'w':
+ return Week
+ case 'M':
+ return Month
+ case 'Y':
+ return Year
+ }
+ return 0
+}
+
+func (t TTL) Minutes() uint32 {
+ switch t.unit {
+ case Empty:
+ return 0
+ case Minute:
+ return uint32(t.count)
+ case Hour:
+ return uint32(t.count) * 60
+ case Day:
+ return uint32(t.count) * 60 * 24
+ case Week:
+ return uint32(t.count) * 60 * 24 * 7
+ case Month:
+ return uint32(t.count) * 60 * 24 * 31
+ case Year:
+ return uint32(t.count) * 60 * 24 * 31 * 365
+ }
+ return 0
+}
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7d2a38cb8..706a1f951 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -4,7 +4,7 @@ import (
"code.google.com/p/weed-fs/go/glog"
"fmt"
"os"
- _ "time"
+ "time"
)
func (v *Volume) garbageLevel() float64 {
@@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 {
func (v *Volume) Compact() error {
glog.V(3).Infof("Compacting ...")
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- glog.V(3).Infof("Got Compaction lock...")
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
+ now := uint64(time.Now().Unix())
+
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
+ }
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {