aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/cdb_map.go2
-rw-r--r--go/storage/cdb_map_test.go2
-rw-r--r--go/storage/compact_map_perf_test.go4
-rw-r--r--go/storage/compress.go2
-rw-r--r--go/storage/crc.go4
-rw-r--r--go/storage/file_id.go4
-rw-r--r--go/storage/needle.go15
-rw-r--r--go/storage/needle_map.go4
-rw-r--r--go/storage/needle_read_write.go25
-rw-r--r--go/storage/replica_placement.go8
-rw-r--r--go/storage/store.go73
-rw-r--r--go/storage/store_vacuum.go2
-rw-r--r--go/storage/volume.go132
-rw-r--r--go/storage/volume_info.go4
-rw-r--r--go/storage/volume_super_block.go75
-rw-r--r--go/storage/volume_super_block_test.go23
-rw-r--r--go/storage/volume_ttl.go135
-rw-r--r--go/storage/volume_ttl_test.go60
-rw-r--r--go/storage/volume_vacuum.go16
19 files changed, 472 insertions, 118 deletions
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go
index d09a87e2a..1869a563e 100644
--- a/go/storage/cdb_map.go
+++ b/go/storage/cdb_map.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
diff --git a/go/storage/cdb_map_test.go b/go/storage/cdb_map_test.go
index f6a7d42ad..cff7dfa61 100644
--- a/go/storage/cdb_map_test.go
+++ b/go/storage/cdb_map_test.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"math/rand"
"os"
"runtime"
diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go
index 37b23a59f..ef43de25b 100644
--- a/go/storage/compact_map_perf_test.go
+++ b/go/storage/compact_map_perf_test.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"log"
"os"
"testing"
diff --git a/go/storage/compress.go b/go/storage/compress.go
index 846fd0714..a353c9d3a 100644
--- a/go/storage/compress.go
+++ b/go/storage/compress.go
@@ -2,7 +2,7 @@ package storage
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"compress/flate"
"compress/gzip"
"io/ioutil"
diff --git a/go/storage/crc.go b/go/storage/crc.go
index 41f7f6d00..7aa400959 100644
--- a/go/storage/crc.go
+++ b/go/storage/crc.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"hash/crc32"
)
@@ -25,5 +25,5 @@ func (c CRC) Value() uint32 {
func (n *Needle) Etag() string {
bits := make([]byte, 4)
util.Uint32toBytes(bits, uint32(n.Checksum))
- return fmt.Sprintf("%x", bits)
+ return fmt.Sprintf("\"%x\"", bits)
}
diff --git a/go/storage/file_id.go b/go/storage/file_id.go
index 5fcd8c387..ec566826c 100644
--- a/go/storage/file_id.go
+++ b/go/storage/file_id.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/hex"
"errors"
"strings"
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 77aa70169..daede321b 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -1,9 +1,9 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/images"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/images"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/hex"
"errors"
"io/ioutil"
@@ -38,12 +38,13 @@ type Needle struct {
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
+ Ttl *TTL
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
}
-func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) {
+func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) {
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
fileName = fileName[:len(fileName)-3]
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
fname, mimeType, isGzipped := "", "", false
n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r)
+ fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r)
if e != nil {
return
}
@@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
+ if n.Ttl != EMPTY_TTL {
+ n.SetHasTtl()
+ }
if fixJpgOrientation {
loweredName := strings.ToLower(fname)
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index 6d94ee1ca..dca2e6c5d 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"os"
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 835d7c270..bf452ba37 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -1,8 +1,8 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
"errors"
"fmt"
"io"
@@ -14,7 +14,9 @@ const (
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
+ FlagHasTtl = 0x10
LastModifiedBytesLength = 5
+ TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
@@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
}
+ if n.HasTtl() {
+ n.Size = n.Size + TtlBytesLength
+ }
}
size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
@@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
+ if n.HasTtl() {
+ n.Ttl.ToBytes(header[0:TtlBytesLength])
+ if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
+ return
+ }
+ }
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
}
+ if index < lenBytes && n.HasTtl() {
+ n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
+ index = index + TtlBytesLength
+ }
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool {
func (n *Needle) SetHasLastModifiedDate() {
n.Flags = n.Flags | FlagHasLastModifiedDate
}
+func (n *Needle) HasTtl() bool {
+ return n.Flags&FlagHasTtl > 0
+}
+func (n *Needle) SetHasTtl() {
+ n.Flags = n.Flags | FlagHasTtl
+}
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
index 696888cd8..c1aca52eb 100644
--- a/go/storage/replica_placement.go
+++ b/go/storage/replica_placement.go
@@ -5,10 +5,6 @@ import (
"fmt"
)
-const (
- ReplicaPlacementCount = 9
-)
-
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
@@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
-
-func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
- return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index a6a4f399e..e7a9dac94 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -2,9 +2,9 @@ package storage
import (
proto "code.google.com/p/goprotobuf/proto"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"fmt"
@@ -14,6 +14,10 @@ import (
"strings"
)
+const (
+ MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
+)
+
type DiskLocation struct {
Directory string
MaxVolumeCount int
@@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
+ ttl, e := ReadTTL(ttlString)
+ if e != nil {
+ return e
+ }
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
@@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt)
+ e = s.addVolume(VolumeId(id), collection, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
e = err
}
}
@@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
+func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(volumes, v.Id)
+ return
+}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
@@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil {
+ glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, vid, collection, replicaPlacement, ttl)
+ if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
}
}
@@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) {
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- }
- volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
+ if !v.expired(s.volumeSizeLimit) {
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
+ } else {
+ if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
+ s.DeleteVolume(location.volumes, v)
+ glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ } else {
+ glog.V(0).Infoln("volume", v.Id, "is expired.")
+ }
+ }
}
}
diff --git a/go/storage/store_vacuum.go b/go/storage/store_vacuum.go
index 5adaa7561..3527e4f59 100644
--- a/go/storage/store_vacuum.go
+++ b/go/storage/store_vacuum.go
@@ -1,7 +1,7 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"fmt"
"strconv"
)
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 7bd8e7467..de79e9107 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -2,7 +2,7 @@ package storage
import (
"bytes"
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"errors"
"fmt"
"io"
@@ -12,22 +12,6 @@ import (
"time"
)
-const (
- SuperBlockSize = 8
-)
-
-type SuperBlock struct {
- Version Version
- ReplicaPlacement *ReplicaPlacement
-}
-
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, SuperBlockSize)
- header[0] = byte(s.Version)
- header[1] = s.ReplicaPlacement.Byte()
- return header
-}
-
type Volume struct {
Id VolumeId
dir string
@@ -38,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -65,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -122,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return e
}
func (v *Volume) Version() Version {
- return v.SuperBlock.Version
+ return v.SuperBlock.Version()
}
func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat()
@@ -138,44 +124,6 @@ func (v *Volume) Close() {
v.nm.Close()
_ = v.dataFile.Close()
}
-func (v *Volume) maybeWriteSuperBlock() error {
- stat, e := v.dataFile.Stat()
- if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
- return e
- }
- if stat.Size() == 0 {
- v.SuperBlock.Version = CurrentVersion
- _, e = v.dataFile.Write(v.SuperBlock.Bytes())
- if e != nil && os.IsPermission(e) {
- //read-only, but zero length - recreate it!
- if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
- if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
- v.readOnly = false
- }
- }
- }
- }
- return e
-}
-func (v *Volume) readSuperBlock() (err error) {
- if _, err = v.dataFile.Seek(0, 0); err != nil {
- return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
- }
- header := make([]byte, SuperBlockSize)
- if _, e := v.dataFile.Read(header); e != nil {
- return fmt.Errorf("cannot read superblock: %s", e.Error())
- }
- v.SuperBlock, err = ParseSuperBlock(header)
- return err
-}
-func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
- superBlock.Version = Version(header[0])
- if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err.Error())
- }
- return
-}
func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
@@ -246,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -275,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -397,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 165af1a19..6410c1784 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -1,13 +1,14 @@
package storage
import (
- "code.google.com/p/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/operation"
)
type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
Collection string
Version Version
FileCount int
@@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
return vi, e
}
vi.ReplicaPlacement = rp
+ vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go
new file mode 100644
index 000000000..a7e86b1c3
--- /dev/null
+++ b/go/storage/volume_super_block.go
@@ -0,0 +1,75 @@
+package storage
+
+import (
+ "github.com/chrislusf/weed-fs/go/glog"
+ "fmt"
+ "os"
+)
+
+const (
+ SuperBlockSize = 8
+)
+
+/*
+* Super block currently has 8 bytes allocated for each volume.
+* Byte 0: version, 1 or 2
+* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Rest bytes: Reserved
+ */
+type SuperBlock struct {
+ version Version
+ ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
+}
+
+func (s *SuperBlock) Version() Version {
+ return s.version
+}
+func (s *SuperBlock) Bytes() []byte {
+ header := make([]byte, SuperBlockSize)
+ header[0] = byte(s.version)
+ header[1] = s.ReplicaPlacement.Byte()
+ s.Ttl.ToBytes(header[2:4])
+ return header
+}
+
+func (v *Volume) maybeWriteSuperBlock() error {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
+ return e
+ }
+ if stat.Size() == 0 {
+ v.SuperBlock.version = CurrentVersion
+ _, e = v.dataFile.Write(v.SuperBlock.Bytes())
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
+ if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
+ v.readOnly = false
+ }
+ }
+ }
+ }
+ return e
+}
+func (v *Volume) readSuperBlock() (err error) {
+ if _, err = v.dataFile.Seek(0, 0); err != nil {
+ return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error())
+ }
+ header := make([]byte, SuperBlockSize)
+ if _, e := v.dataFile.Read(header); e != nil {
+ return fmt.Errorf("cannot read superblock: %s", e.Error())
+ }
+ v.SuperBlock, err = ParseSuperBlock(header)
+ return err
+}
+func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
+ superBlock.version = Version(header[0])
+ if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
+ err = fmt.Errorf("cannot read replica type: %s", err.Error())
+ }
+ superBlock.Ttl = LoadTTLFromBytes(header[2:4])
+ return
+}
diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go
new file mode 100644
index 000000000..13db4b194
--- /dev/null
+++ b/go/storage/volume_super_block_test.go
@@ -0,0 +1,23 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestSuperBlockReadWrite(t *testing.T) {
+ rp, _ := NewReplicaPlacementFromByte(byte(001))
+ ttl, _ := ReadTTL("15d")
+ s := &SuperBlock{
+ version: CurrentVersion,
+ ReplicaPlacement: rp,
+ Ttl: ttl,
+ }
+
+ bytes := s.Bytes()
+
+ if !(bytes[2] == 15 && bytes[3] == Day) {
+ println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
+ t.Fail()
+ }
+
+}
diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go
new file mode 100644
index 000000000..459ee55ba
--- /dev/null
+++ b/go/storage/volume_ttl.go
@@ -0,0 +1,135 @@
+package storage
+
+import (
+ "strconv"
+)
+
+const (
+ //stored unit types
+ Empty byte = iota
+ Minute
+ Hour
+ Day
+ Week
+ Month
+ Year
+)
+
+type TTL struct {
+ count byte
+ unit byte
+}
+
+var EMPTY_TTL = &TTL{}
+
+// translate a readable ttl to internal ttl
+// Supports format example:
+// 3m: 3 minutes
+// 4h: 4 hours
+// 5d: 5 days
+// 6w: 6 weeks
+// 7M: 7 months
+// 8y: 8 years
+func ReadTTL(ttlString string) (*TTL, error) {
+ if ttlString == "" {
+ return EMPTY_TTL, nil
+ }
+ ttlBytes := []byte(ttlString)
+ unitByte := ttlBytes[len(ttlBytes)-1]
+ countBytes := ttlBytes[0 : len(ttlBytes)-1]
+ if '0' <= unitByte && unitByte <= '9' {
+ countBytes = ttlBytes
+ unitByte = 'm'
+ }
+ count, err := strconv.Atoi(string(countBytes))
+ unit := toStoredByte(unitByte)
+ return &TTL{count: byte(count), unit: unit}, err
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromBytes(input []byte) (t *TTL) {
+ return &TTL{count: input[0], unit: input[1]}
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromUint32(ttl uint32) (t *TTL) {
+ input := make([]byte, 2)
+ input[1] = byte(ttl)
+ input[0] = byte(ttl >> 8)
+ return LoadTTLFromBytes(input)
+}
+
+// save stored bytes to an output with 2 bytes
+func (t TTL) ToBytes(output []byte) {
+ output[0] = t.count
+ output[1] = t.unit
+}
+
+func (t TTL) ToUint32() (output uint32) {
+ output = uint32(t.count) << 8
+ output += uint32(t.unit)
+ return output
+}
+
+func (t TTL) String() string {
+ if t.count == 0 {
+ return ""
+ }
+ if t.unit == Empty {
+ return ""
+ }
+ countString := strconv.Itoa(int(t.count))
+ switch t.unit {
+ case Minute:
+ return countString + "m"
+ case Hour:
+ return countString + "h"
+ case Day:
+ return countString + "d"
+ case Week:
+ return countString + "w"
+ case Month:
+ return countString + "M"
+ case Year:
+ return countString + "y"
+ }
+ return ""
+}
+
+func toStoredByte(readableUnitByte byte) byte {
+ switch readableUnitByte {
+ case 'm':
+ return Minute
+ case 'h':
+ return Hour
+ case 'd':
+ return Day
+ case 'w':
+ return Week
+ case 'M':
+ return Month
+ case 'y':
+ return Year
+ }
+ return 0
+}
+
+func (t TTL) Minutes() uint32 {
+ switch t.unit {
+ case Empty:
+ return 0
+ case Minute:
+ return uint32(t.count)
+ case Hour:
+ return uint32(t.count) * 60
+ case Day:
+ return uint32(t.count) * 60 * 24
+ case Week:
+ return uint32(t.count) * 60 * 24 * 7
+ case Month:
+ return uint32(t.count) * 60 * 24 * 31
+ case Year:
+ return uint32(t.count) * 60 * 24 * 365
+ }
+ return 0
+}
diff --git a/go/storage/volume_ttl_test.go b/go/storage/volume_ttl_test.go
new file mode 100644
index 000000000..216469a4c
--- /dev/null
+++ b/go/storage/volume_ttl_test.go
@@ -0,0 +1,60 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestTTLReadWrite(t *testing.T) {
+ ttl, _ := ReadTTL("")
+ if ttl.Minutes() != 0 {
+ t.Errorf("empty ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("9")
+ if ttl.Minutes() != 9 {
+ t.Errorf("9 ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("8m")
+ if ttl.Minutes() != 8 {
+ t.Errorf("8m ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5h")
+ if ttl.Minutes() != 300 {
+ t.Errorf("5h ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5d")
+ if ttl.Minutes() != 5*24*60 {
+ t.Errorf("5d ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5w")
+ if ttl.Minutes() != 5*7*24*60 {
+ t.Errorf("5w ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5M")
+ if ttl.Minutes() != 5*31*24*60 {
+ t.Errorf("5M ttl:%v", ttl)
+ }
+
+ ttl, _ = ReadTTL("5y")
+ if ttl.Minutes() != 5*365*24*60 {
+ t.Errorf("5y ttl:%v", ttl)
+ }
+
+ output := make([]byte, 2)
+ ttl.ToBytes(output)
+ ttl2 := LoadTTLFromBytes(output)
+ if ttl.Minutes() != ttl2.Minutes() {
+ t.Errorf("ttl:%v ttl2:%v", ttl, ttl2)
+ }
+
+ ttl3 := LoadTTLFromUint32(ttl.ToUint32())
+ if ttl.Minutes() != ttl3.Minutes() {
+ t.Errorf("ttl:%v ttl3:%v", ttl, ttl3)
+ }
+
+}
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7d2a38cb8..b348434d2 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -1,10 +1,10 @@
package storage
import (
- "code.google.com/p/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
- _ "time"
+ "time"
)
func (v *Volume) garbageLevel() float64 {
@@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 {
func (v *Volume) Compact() error {
glog.V(3).Infof("Compacting ...")
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- glog.V(3).Infof("Got Compaction lock...")
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
+ now := uint64(time.Now().Unix())
+
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
+ }
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {