aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/needle.go9
-rw-r--r--go/storage/needle_read_write.go21
-rw-r--r--go/storage/replica_placement.go8
-rw-r--r--go/storage/store.go67
-rw-r--r--go/storage/volume.go74
-rw-r--r--go/storage/volume_info.go2
-rw-r--r--go/storage/volume_super_block.go10
-rw-r--r--go/storage/volume_super_block_test.go7
-rw-r--r--go/storage/volume_ttl.go147
-rw-r--r--go/storage/volume_vacuum.go14
10 files changed, 311 insertions, 48 deletions
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 77aa70169..3bf627141 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -38,12 +38,13 @@ type Needle struct {
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
+ Ttl *TTL
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
}
-func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) {
+func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) {
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
fileName = fileName[:len(fileName)-3]
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
fname, mimeType, isGzipped := "", "", false
n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r)
+ fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r)
if e != nil {
return
}
@@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
+ if n.Ttl != nil {
+ n.SetHasTtl()
+ }
if fixJpgOrientation {
loweredName := strings.ToLower(fname)
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 835d7c270..848121ff2 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -14,7 +14,9 @@ const (
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
+ FlagHasTtl = 0x10
LastModifiedBytesLength = 5
+ TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
@@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
}
+ if n.HasTtl() {
+ n.Size = n.Size + TtlBytesLength
+ }
}
size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
@@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
+ if n.HasTtl() {
+ n.Ttl.ToBytes(header[0:TtlBytesLength])
+ if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
+ return
+ }
+ }
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
}
+ if index < lenBytes && n.HasTtl() {
+ n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
+ index = index + TtlBytesLength
+ }
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool {
func (n *Needle) SetHasLastModifiedDate() {
n.Flags = n.Flags | FlagHasLastModifiedDate
}
+func (n *Needle) HasTtl() bool {
+ return n.Flags&FlagHasTtl > 0
+}
+func (n *Needle) SetHasTtl() {
+ n.Flags = n.Flags | FlagHasTtl
+}
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
index 696888cd8..c1aca52eb 100644
--- a/go/storage/replica_placement.go
+++ b/go/storage/replica_placement.go
@@ -5,10 +5,6 @@ import (
"fmt"
)
-const (
- ReplicaPlacementCount = 9
-)
-
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
@@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
-
-func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
- return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index a6a4f399e..ef38ade98 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -14,6 +14,10 @@ import (
"strings"
)
+const (
+ MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
+)
+
type DiskLocation struct {
Directory string
MaxVolumeCount int
@@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
+ ttl, e := ReadTTL(ttlString)
+ if e != nil {
+ return e
+ }
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
@@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt)
+ e = s.addVolume(VolumeId(id), collection, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
e = err
}
}
@@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
+func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(volumes, v.Id)
+ return
+}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
@@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil {
+ glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, vid, collection, replicaPlacement, ttl)
+ if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
}
}
@@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) {
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- }
- volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
+ if !v.expired(s.volumeSizeLimit) {
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
+ } else {
+ if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
+ s.DeleteVolume(location.volumes, v)
+ glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ } else {
+ glog.V(0).Infoln("volume", v.Id, "is expired.")
+ }
+ }
}
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index dec560545..34ae7e386 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -22,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 165af1a19..6a954f743 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -8,6 +8,7 @@ type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
Collection string
Version Version
FileCount int
@@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
return vi, e
}
vi.ReplicaPlacement = rp
+ vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go
index 35030b93e..3fbef44d6 100644
--- a/go/storage/volume_super_block.go
+++ b/go/storage/volume_super_block.go
@@ -2,7 +2,6 @@ package storage
import (
"code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
"fmt"
"os"
)
@@ -15,12 +14,13 @@ const (
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
-* Byte 2 and byte 3: Time to live in minutes
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Rest bytes: Reserved
*/
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
- Ttl uint16
+ Ttl *TTL
}
func (s *SuperBlock) Version() Version {
@@ -30,7 +30,7 @@ func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
- util.Uint16toBytes(header[2:4], s.Ttl)
+ s.Ttl.ToBytes(header[2:4])
return header
}
@@ -70,6 +70,6 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
- superBlock.Ttl = util.BytesToUint16(header[2:4])
+ superBlock.Ttl = LoadTTLFromBytes(header[2:4])
return
}
diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go
index 19a1bb757..13db4b194 100644
--- a/go/storage/volume_super_block_test.go
+++ b/go/storage/volume_super_block_test.go
@@ -6,16 +6,17 @@ import (
func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
+ ttl, _ := ReadTTL("15d")
s := &SuperBlock{
version: CurrentVersion,
ReplicaPlacement: rp,
- Ttl: uint16(35),
+ Ttl: ttl,
}
bytes := s.Bytes()
- if !(bytes[2] == 0 && bytes[3] == 35) {
- println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
+ if !(bytes[2] == 15 && bytes[3] == Day) {
+ println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
t.Fail()
}
diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go
new file mode 100644
index 000000000..5ff43e24e
--- /dev/null
+++ b/go/storage/volume_ttl.go
@@ -0,0 +1,147 @@
+package storage
+
+import (
+ "strconv"
+)
+
+var (
+ TtlRange []uint16
+)
+
+const (
+ //stored unit types
+ Empty byte = iota
+ Minute
+ Hour
+ Day
+ Week
+ Month
+ Year
+)
+
+func init() {
+ TtlRange = []uint16{3, 10, 30,
+ 60 /*1 hour*/, 60 * 3, 60 * 6, 60 * 12,
+ 1440 /*1 day*/, 1440 * 3, 1440 * 7, 1440 * 15, 1440 * 31,
+ 44888 /*1 month*/, 65535,
+ }
+}
+
+type TTL struct {
+ count byte
+ unit byte
+}
+
+var EMPTY_TTL = &TTL{}
+
+// translate a readable ttl to internal ttl
+// Supports format example:
+// 3m: 3 minutes
+// 4h: 4 hours
+// 5d: 5 days
+// 6w: 6 weeks
+// 7M: 7 months
+// 8y: 8 years
+func ReadTTL(ttlString string) (*TTL, error) {
+ if ttlString == "" {
+ return EMPTY_TTL, nil
+ }
+ ttlBytes := []byte(ttlString)
+ unitByte := ttlBytes[len(ttlBytes)-1]
+ countBytes := ttlBytes[0 : len(ttlBytes)-1]
+ if '0' <= unitByte && unitByte <= '9' {
+ countBytes = ttlBytes
+ unitByte = 'm'
+ }
+ count, err := strconv.Atoi(string(countBytes))
+ unit := toStoredByte(unitByte)
+ return &TTL{count: byte(count), unit: unit}, err
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromBytes(input []byte) (t *TTL) {
+ return &TTL{count: input[0], unit: input[1]}
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromUint32(ttl uint32) (t *TTL) {
+ input := make([]byte, 2)
+ input[1] = byte(ttl)
+ input[0] = byte(ttl >> 8)
+ return LoadTTLFromBytes(input)
+}
+
+// save stored bytes to an output with 2 bytes
+func (t TTL) ToBytes(output []byte) {
+ output[0] = t.count
+ output[1] = t.unit
+}
+
+func (t TTL) ToUint32() (output uint32) {
+ output = uint32(t.count) << 8
+ output += uint32(t.unit)
+ return output
+}
+
+func (t TTL) String() string {
+ if t.count == 0 {
+ return ""
+ }
+ if t.unit == Empty {
+ return ""
+ }
+ countString := strconv.Itoa(int(t.count))
+ switch t.unit {
+ case Minute:
+ return countString + "m"
+ case Hour:
+ return countString + "h"
+ case Day:
+ return countString + "d"
+ case Week:
+ return countString + "w"
+ case Month:
+ return countString + "M"
+ case Year:
+ return countString + "y"
+ }
+ return ""
+}
+
+func toStoredByte(readableUnitByte byte) byte {
+ switch readableUnitByte {
+ case 'm':
+ return Minute
+ case 'h':
+ return Hour
+ case 'd':
+ return Day
+ case 'w':
+ return Week
+ case 'M':
+ return Month
+ case 'Y':
+ return Year
+ }
+ return 0
+}
+
+func (t TTL) Minutes() uint32 {
+ switch t.unit {
+ case Empty:
+ return 0
+ case Minute:
+ return uint32(t.count)
+ case Hour:
+ return uint32(t.count) * 60
+ case Day:
+ return uint32(t.count) * 60 * 24
+ case Week:
+ return uint32(t.count) * 60 * 24 * 7
+ case Month:
+ return uint32(t.count) * 60 * 24 * 31
+ case Year:
+ return uint32(t.count) * 60 * 24 * 31 * 365
+ }
+ return 0
+}
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7d2a38cb8..706a1f951 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -4,7 +4,7 @@ import (
"code.google.com/p/weed-fs/go/glog"
"fmt"
"os"
- _ "time"
+ "time"
)
func (v *Volume) garbageLevel() float64 {
@@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 {
func (v *Volume) Compact() error {
glog.V(3).Infof("Compacting ...")
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- glog.V(3).Infof("Got Compaction lock...")
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
+ now := uint64(time.Now().Unix())
+
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
+ }
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {