aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/filer/directory_in_map.go4
-rw-r--r--go/operation/assign_file_id.go5
-rw-r--r--go/operation/submit.go11
-rw-r--r--go/operation/system_message.pb.go12
-rw-r--r--go/proto/system_message.proto1
-rw-r--r--go/storage/needle.go9
-rw-r--r--go/storage/needle_read_write.go21
-rw-r--r--go/storage/replica_placement.go8
-rw-r--r--go/storage/store.go67
-rw-r--r--go/storage/volume.go74
-rw-r--r--go/storage/volume_info.go2
-rw-r--r--go/storage/volume_super_block.go10
-rw-r--r--go/storage/volume_super_block_test.go7
-rw-r--r--go/storage/volume_ttl.go147
-rw-r--r--go/storage/volume_vacuum.go14
-rw-r--r--go/topology/allocate_volume.go7
-rw-r--r--go/topology/collection.go23
-rw-r--r--go/topology/data_node.go6
-rw-r--r--go/topology/topology.go21
-rw-r--r--go/topology/topology_event_handling.go6
-rw-r--r--go/topology/topology_map.go2
-rw-r--r--go/topology/topology_vacuum.go2
-rw-r--r--go/topology/volume_growth.go12
-rw-r--r--go/topology/volume_layout.go13
-rw-r--r--go/weed/benchmark.go2
-rw-r--r--go/weed/compact.go2
-rw-r--r--go/weed/upload.go6
-rw-r--r--go/weed/weed_server/common.go4
-rw-r--r--go/weed/weed_server/filer_server_handlers.go12
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go9
-rw-r--r--go/weed/weed_server/volume_server_handlers_admin.go2
31 files changed, 416 insertions, 105 deletions
diff --git a/go/filer/directory_in_map.go b/go/filer/directory_in_map.go
index 46a626f77..35b4e53c1 100644
--- a/go/filer/directory_in_map.go
+++ b/go/filer/directory_in_map.go
@@ -187,10 +187,10 @@ func (dm *DirectoryManagerInMap) makeDirectory(dirPath string) (dir *DirectoryEn
for i := 1; i < len(parts); i++ {
sub, ok := dir.SubDirectories[parts[i]]
if !ok {
- var err error
+ var err error
sub, err = dm.NewDirectoryEntryInMap(dir, parts[i])
if err != nil {
- return nil, false
+ return nil, false
}
dir.SubDirectories[parts[i]] = sub
created = true
diff --git a/go/operation/assign_file_id.go b/go/operation/assign_file_id.go
index 018e1d763..34d371f37 100644
--- a/go/operation/assign_file_id.go
+++ b/go/operation/assign_file_id.go
@@ -17,7 +17,7 @@ type AssignResult struct {
Error string `json:"error,omitempty"`
}
-func Assign(server string, count int, replication string, collection string) (*AssignResult, error) {
+func Assign(server string, count int, replication string, collection string, ttl string) (*AssignResult, error) {
values := make(url.Values)
values.Add("count", strconv.Itoa(count))
if replication != "" {
@@ -26,6 +26,9 @@ func Assign(server string, count int, replication string, collection string) (*A
if collection != "" {
values.Add("collection", collection)
}
+ if ttl != "" {
+ values.Add("ttl", ttl)
+ }
jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
glog.V(2).Info("assign result :", string(jsonBlob))
if err != nil {
diff --git a/go/operation/submit.go b/go/operation/submit.go
index 9191f7d9a..ec45cc320 100644
--- a/go/operation/submit.go
+++ b/go/operation/submit.go
@@ -20,6 +20,7 @@ type FilePart struct {
ModTime int64 //in seconds
Replication string
Collection string
+ Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
}
@@ -32,12 +33,12 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart, replication string, collection string, maxMB int) ([]SubmitResult, error) {
+func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
}
- ret, err := Assign(master, len(files), replication, collection)
+ ret, err := Assign(master, len(files), replication, collection, ttl)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -112,7 +113,7 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
chunks := fi.FileSize/chunkSize + 1
fids := make([]string, 0)
for i := int64(0); i < chunks; i++ {
- id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection)
+ id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
if e != nil {
return 0, e
}
@@ -130,8 +131,8 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
return
}
-func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string) (fid string, size uint32, e error) {
- ret, err := Assign(master, 1, replication, collection)
+func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
+ ret, err := Assign(master, 1, replication, collection, ttl)
if err != nil {
return "", 0, err
}
diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go
index 45ae8a648..9f00dd74d 100644
--- a/go/operation/system_message.pb.go
+++ b/go/operation/system_message.pb.go
@@ -15,12 +15,10 @@ It has these top-level messages:
package operation
import proto "code.google.com/p/goprotobuf/proto"
-import json "encoding/json"
import math "math"
-// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
-var _ = &json.SyntaxError{}
var _ = math.Inf
type VolumeInformationMessage struct {
@@ -33,6 +31,7 @@ type VolumeInformationMessage struct {
ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
+ Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@@ -105,6 +104,13 @@ func (m *VolumeInformationMessage) GetVersion() uint32 {
return Default_VolumeInformationMessage_Version
}
+func (m *VolumeInformationMessage) GetTtl() uint32 {
+ if m != nil && m.Ttl != nil {
+ return *m.Ttl
+ }
+ return 0
+}
+
type JoinMessage struct {
IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto
index 15574ad56..ecd4973f7 100644
--- a/go/proto/system_message.proto
+++ b/go/proto/system_message.proto
@@ -10,6 +10,7 @@ message VolumeInformationMessage {
optional bool read_only = 7;
required uint32 replica_placement = 8;
optional uint32 version = 9 [default=2];
+ optional uint32 ttl = 10;
}
message JoinMessage {
diff --git a/go/storage/needle.go b/go/storage/needle.go
index 77aa70169..3bf627141 100644
--- a/go/storage/needle.go
+++ b/go/storage/needle.go
@@ -38,12 +38,13 @@ type Needle struct {
MimeSize uint8 //version2
Mime []byte `comment:"maximum 256 characters"` //version2
LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk
+ Ttl *TTL
Checksum CRC `comment:"CRC32 to check integrity"`
Padding []byte `comment:"Aligned to 8 bytes"`
}
-func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) {
+func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) {
form, fe := r.MultipartReader()
if fe != nil {
glog.V(0).Infoln("MultipartReader [ERROR]", fe)
@@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string
fileName = fileName[:len(fileName)-3]
}
modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ ttl, _ = ReadTTL(r.FormValue("ttl"))
return
}
func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
fname, mimeType, isGzipped := "", "", false
n = new(Needle)
- fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r)
+ fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r)
if e != nil {
return
}
@@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) {
n.LastModified = uint64(time.Now().Unix())
}
n.SetHasLastModifiedDate()
+ if n.Ttl != nil {
+ n.SetHasTtl()
+ }
if fixJpgOrientation {
loweredName := strings.ToLower(fname)
diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go
index 835d7c270..848121ff2 100644
--- a/go/storage/needle_read_write.go
+++ b/go/storage/needle_read_write.go
@@ -14,7 +14,9 @@ const (
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
+ FlagHasTtl = 0x10
LastModifiedBytesLength = 5
+ TtlBytesLength = 2
)
func (n *Needle) DiskSize() int64 {
@@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
}
+ if n.HasTtl() {
+ n.Size = n.Size + TtlBytesLength
+ }
}
size = n.DataSize
util.Uint32toBytes(header[12:16], n.Size)
@@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
return
}
}
+ if n.HasTtl() {
+ n.Ttl.ToBytes(header[0:TtlBytesLength])
+ if _, err = w.Write(header[0:TtlBytesLength]); err != nil {
+ return
+ }
+ }
}
padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
@@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) {
n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
index = index + LastModifiedBytesLength
}
+ if index < lenBytes && n.HasTtl() {
+ n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
+ index = index + TtlBytesLength
+ }
}
func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) {
@@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool {
func (n *Needle) SetHasLastModifiedDate() {
n.Flags = n.Flags | FlagHasLastModifiedDate
}
+func (n *Needle) HasTtl() bool {
+ return n.Flags&FlagHasTtl > 0
+}
+func (n *Needle) SetHasTtl() {
+ n.Flags = n.Flags | FlagHasTtl
+}
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
index 696888cd8..c1aca52eb 100644
--- a/go/storage/replica_placement.go
+++ b/go/storage/replica_placement.go
@@ -5,10 +5,6 @@ import (
"fmt"
)
-const (
- ReplicaPlacementCount = 9
-)
-
type ReplicaPlacement struct {
SameRackCount int
DiffRackCount int
@@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string {
func (rp *ReplicaPlacement) GetCopyCount() int {
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
}
-
-func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
- return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index a6a4f399e..ef38ade98 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -14,6 +14,10 @@ import (
"strings"
)
+const (
+ MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
+)
+
type DiskLocation struct {
Directory string
MaxVolumeCount int
@@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
+func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error {
rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
+ ttl, e := ReadTTL(ttlString)
+ if e != nil {
+ return e
+ }
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
@@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
if err != nil {
return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string)
}
- e = s.addVolume(VolumeId(id), collection, rt)
+ e = s.addVolume(VolumeId(id), collection, rt, ttl)
} else {
pair := strings.Split(range_string, "-")
start, start_err := strconv.ParseUint(pair[0], 10, 64)
@@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla
return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1])
}
for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), collection, rt); err != nil {
+ if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil {
e = err
}
}
@@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) {
}
return
}
+func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) {
+ e = v.Destroy()
+ if e != nil {
+ return
+ }
+ delete(volumes, v.Id)
+ return
+}
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
if v, found := location.volumes[vid]; found {
@@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
- if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil {
+ glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
+ location.Directory, vid, collection, replicaPlacement, ttl)
+ if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil {
+ if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
}
}
}
@@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) {
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- }
- volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
+ if !v.expired(s.volumeSizeLimit) {
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
+ } else {
+ if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
+ s.DeleteVolume(location.volumes, v)
+ glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ } else {
+ glog.V(0).Infoln("volume", v.Id, "is expired.")
+ }
+ }
}
}
diff --git a/go/storage/volume.go b/go/storage/volume.go
index dec560545..34ae7e386 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -22,12 +22,13 @@ type Volume struct {
SuperBlock
- accessLock sync.Mutex
+ accessLock sync.Mutex
+ lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
e = v.load(true, true)
return
}
@@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
var e error
fileName := v.FileName()
- if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists {
+ if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists {
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
if canWrite {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.lastModifiedTime = uint64(modifiedTime.Unix())
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.dataFile, e = os.Open(fileName + ".dat")
@@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error())
}
}
+ if v.lastModifiedTime < n.LastModified {
+ v.lastModifiedTime = n.LastModified
+ }
return
}
@@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
func (v *Volume) read(n *Needle) (int, error) {
nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if !ok || nv.Offset == 0 {
+ return -1, errors.New("Not Found")
+ }
+ bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version())
+ if err != nil {
+ return bytesRead, err
+ }
+ if !n.HasTtl() {
+ return bytesRead, err
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
+ return bytesRead, nil
}
return -1, errors.New("Not Found")
}
@@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
}
return true
}
+
+// volume is expired if modified time + volume ttl < now
+// except when volume is empty
+// or when the volume does not have a ttl
+// or when volumeSizeLimit is 0 when server just starts
+func (v *Volume) expired(volumeSizeLimit uint64) bool {
+ if volumeSizeLimit == 0 {
+ //skip if we don't know size limit
+ return false
+ }
+ if v.ContentSize() == 0 {
+ return false
+ }
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime)
+ livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60
+ glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ if int64(v.Ttl.Minutes()) < livedMinutes {
+ return true
+ }
+ return false
+}
+
+// wait either maxDelayMinutes or 10% of ttl minutes
+func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool {
+ if v.Ttl == nil || v.Ttl.Minutes() == 0 {
+ return false
+ }
+ removalDelay := v.Ttl.Minutes() / 10
+ if removalDelay > maxDelayMinutes {
+ removalDelay = maxDelayMinutes
+ }
+
+ if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) {
+ return true
+ }
+ return false
+}
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 165af1a19..6a954f743 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -8,6 +8,7 @@ type VolumeInfo struct {
Id VolumeId
Size uint64
ReplicaPlacement *ReplicaPlacement
+ Ttl *TTL
Collection string
Version Version
FileCount int
@@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er
return vi, e
}
vi.ReplicaPlacement = rp
+ vi.Ttl = LoadTTLFromUint32(*m.Ttl)
return vi, nil
}
diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go
index 35030b93e..3fbef44d6 100644
--- a/go/storage/volume_super_block.go
+++ b/go/storage/volume_super_block.go
@@ -2,7 +2,6 @@ package storage
import (
"code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/util"
"fmt"
"os"
)
@@ -15,12 +14,13 @@ const (
* Super block currently has 8 bytes allocated for each volume.
* Byte 0: version, 1 or 2
* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
-* Byte 2 and byte 3: Time to live in minutes
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Rest bytes: Reserved
*/
type SuperBlock struct {
version Version
ReplicaPlacement *ReplicaPlacement
- Ttl uint16
+ Ttl *TTL
}
func (s *SuperBlock) Version() Version {
@@ -30,7 +30,7 @@ func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.version)
header[1] = s.ReplicaPlacement.Byte()
- util.Uint16toBytes(header[2:4], s.Ttl)
+ s.Ttl.ToBytes(header[2:4])
return header
}
@@ -70,6 +70,6 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
- superBlock.Ttl = util.BytesToUint16(header[2:4])
+ superBlock.Ttl = LoadTTLFromBytes(header[2:4])
return
}
diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go
index 19a1bb757..13db4b194 100644
--- a/go/storage/volume_super_block_test.go
+++ b/go/storage/volume_super_block_test.go
@@ -6,16 +6,17 @@ import (
func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
+ ttl, _ := ReadTTL("15d")
s := &SuperBlock{
version: CurrentVersion,
ReplicaPlacement: rp,
- Ttl: uint16(35),
+ Ttl: ttl,
}
bytes := s.Bytes()
- if !(bytes[2] == 0 && bytes[3] == 35) {
- println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
+ if !(bytes[2] == 15 && bytes[3] == Day) {
+ println("byte[2]:", bytes[2], "byte[3]:", bytes[3])
t.Fail()
}
diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go
new file mode 100644
index 000000000..5ff43e24e
--- /dev/null
+++ b/go/storage/volume_ttl.go
@@ -0,0 +1,147 @@
+package storage
+
+import (
+ "strconv"
+)
+
+var (
+ TtlRange []uint16
+)
+
+const (
+ //stored unit types
+ Empty byte = iota
+ Minute
+ Hour
+ Day
+ Week
+ Month
+ Year
+)
+
+func init() {
+ TtlRange = []uint16{3, 10, 30,
+ 60 /*1 hour*/, 60 * 3, 60 * 6, 60 * 12,
+ 1440 /*1 day*/, 1440 * 3, 1440 * 7, 1440 * 15, 1440 * 31,
+ 44888 /*1 month*/, 65535,
+ }
+}
+
+type TTL struct {
+ count byte
+ unit byte
+}
+
+var EMPTY_TTL = &TTL{}
+
+// translate a readable ttl to internal ttl
+// Supports format example:
+// 3m: 3 minutes
+// 4h: 4 hours
+// 5d: 5 days
+// 6w: 6 weeks
+// 7M: 7 months
+// 8y: 8 years
+func ReadTTL(ttlString string) (*TTL, error) {
+ if ttlString == "" {
+ return EMPTY_TTL, nil
+ }
+ ttlBytes := []byte(ttlString)
+ unitByte := ttlBytes[len(ttlBytes)-1]
+ countBytes := ttlBytes[0 : len(ttlBytes)-1]
+ if '0' <= unitByte && unitByte <= '9' {
+ countBytes = ttlBytes
+ unitByte = 'm'
+ }
+ count, err := strconv.Atoi(string(countBytes))
+ unit := toStoredByte(unitByte)
+ return &TTL{count: byte(count), unit: unit}, err
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromBytes(input []byte) (t *TTL) {
+ return &TTL{count: input[0], unit: input[1]}
+}
+
+// read stored bytes to a ttl
+func LoadTTLFromUint32(ttl uint32) (t *TTL) {
+ input := make([]byte, 2)
+ input[1] = byte(ttl)
+ input[0] = byte(ttl >> 8)
+ return LoadTTLFromBytes(input)
+}
+
+// save stored bytes to an output with 2 bytes
+func (t TTL) ToBytes(output []byte) {
+ output[0] = t.count
+ output[1] = t.unit
+}
+
+func (t TTL) ToUint32() (output uint32) {
+ output = uint32(t.count) << 8
+ output += uint32(t.unit)
+ return output
+}
+
+func (t TTL) String() string {
+ if t.count == 0 {
+ return ""
+ }
+ if t.unit == Empty {
+ return ""
+ }
+ countString := strconv.Itoa(int(t.count))
+ switch t.unit {
+ case Minute:
+ return countString + "m"
+ case Hour:
+ return countString + "h"
+ case Day:
+ return countString + "d"
+ case Week:
+ return countString + "w"
+ case Month:
+ return countString + "M"
+ case Year:
+ return countString + "y"
+ }
+ return ""
+}
+
+func toStoredByte(readableUnitByte byte) byte {
+ switch readableUnitByte {
+ case 'm':
+ return Minute
+ case 'h':
+ return Hour
+ case 'd':
+ return Day
+ case 'w':
+ return Week
+ case 'M':
+ return Month
+ case 'Y':
+ return Year
+ }
+ return 0
+}
+
+func (t TTL) Minutes() uint32 {
+ switch t.unit {
+ case Empty:
+ return 0
+ case Minute:
+ return uint32(t.count)
+ case Hour:
+ return uint32(t.count) * 60
+ case Day:
+ return uint32(t.count) * 60 * 24
+ case Week:
+ return uint32(t.count) * 60 * 24 * 7
+ case Month:
+ return uint32(t.count) * 60 * 24 * 31
+ case Year:
+ return uint32(t.count) * 60 * 24 * 31 * 365
+ }
+ return 0
+}
diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go
index 7d2a38cb8..706a1f951 100644
--- a/go/storage/volume_vacuum.go
+++ b/go/storage/volume_vacuum.go
@@ -4,7 +4,7 @@ import (
"code.google.com/p/weed-fs/go/glog"
"fmt"
"os"
- _ "time"
+ "time"
)
func (v *Volume) garbageLevel() float64 {
@@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 {
func (v *Volume) Compact() error {
glog.V(3).Infof("Compacting ...")
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- glog.V(3).Infof("Got Compaction lock...")
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
filePath := v.FileName()
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
nm := NewNeedleMap(idx)
new_offset := int64(SuperBlockSize)
+ now := uint64(time.Now().Unix())
+
err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error {
_, err = dst.Write(superBlock.Bytes())
return err
}, true, func(n *Needle, offset int64) error {
+ if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ return nil
+ }
nv, ok := v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 {
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
index 77b4ac508..4aeef35f7 100644
--- a/go/topology/allocate_volume.go
+++ b/go/topology/allocate_volume.go
@@ -12,11 +12,12 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
values := make(url.Values)
values.Add("volume", vid.String())
- values.Add("collection", collection)
- values.Add("replication", rp.String())
+ values.Add("collection", option.Collection)
+ values.Add("replication", option.ReplicaPlacement.String())
+ values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
if err != nil {
return err
diff --git a/go/topology/collection.go b/go/topology/collection.go
index b21122d22..c014231af 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -1,33 +1,34 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
)
type Collection struct {
Name string
volumeSizeLimit uint64
- replicaType2VolumeLayout []*VolumeLayout
+ storageType2VolumeLayout map[string]*VolumeLayout
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount)
+ c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
return c
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout {
- replicaPlacementIndex := rp.GetReplicationLevelIndex()
- if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil {
- glog.V(0).Infoln("collection", c.Name, "adding replication type", rp)
- c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit)
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+ keyString := rp.String()
+ if ttl != nil {
+ keyString += ttl.String()
}
- return c.replicaType2VolumeLayout[replicaPlacementIndex]
+ if c.storageType2VolumeLayout[keyString] == nil {
+ c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ }
+ return c.storageType2VolumeLayout[keyString]
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
@@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index ae80e08bb..c67c5c1c1 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
}
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
- for vid, _ := range dn.volumes {
+ for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
+ deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
@@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
}
+ return
}
func (dn *DataNode) GetDataCenter() *DataCenter {
diff --git a/go/topology/topology.go b/go/topology/topology.go
index f1daffb53..acdef5e36 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
- vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
@@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
+}
+func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ glog.Infof("removing volume info:%+v", v)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
+func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
@@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
- dn.UpdateVolumes(volumeInfos)
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 7398ff9bf..1e630e149 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement)
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index f66d4c251..d6400c988 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} {
m["DataCenters"] = dcs
var layouts []interface{}
for _, c := range t.collectionMap {
- for _, layout := range c.replicaType2VolumeLayout {
+ for _, layout := range c.storageType2VolumeLayout {
if layout != nil {
tmp := layout.ToMap()
tmp["collection"] = c.Name
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index a1d6d2564..9eaca37d4 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
func (t *Topology) Vacuum(garbageThreshold string) int {
for _, c := range t.collectionMap {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
index 4965e3ba0..778aa038a 100644
--- a/go/topology/volume_growth.go
+++ b/go/topology/volume_growth.go
@@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
+ Ttl *storage.TTL
DataCenter string
Rack string
DataNode string
@@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
+ if err := AllocateVolume(server, vid, option); err == nil {
+ vi := storage.VolumeInfo{
+ Id: vid,
+ Size: 0,
+ Collection: option.Collection,
+ ReplicaPlacement: option.ReplicaPlacement,
+ Ttl: option.Ttl,
+ Version: storage.CurrentVersion,
+ }
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 538acb54c..1e55072a3 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -11,15 +11,17 @@ import (
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
+ ttl *storage.TTL
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
+ ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
@@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
+func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.removeFromWritable(v.Id)
+ delete(vl.vid2location, v.Id)
+}
+
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
@@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
+ m["ttl"] = vl.ttl.String()
m["writables"] = vl.writables
//m["locations"] = vl.vid2location
return m
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index eab923751..27aebaef0 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
start := time.Now()
fileSize := int64(*b.fileSize + rand.Intn(64))
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
- if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil {
+ if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
if _, ok := serverLimitChan[fp.Server]; !ok {
serverLimitChan[fp.Server] = make(chan bool, 7)
diff --git a/go/weed/compact.go b/go/weed/compact.go
index 580f3f98d..57a02261f 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil)
+ v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/go/weed/upload.go b/go/weed/upload.go
index b59313a2a..c21499dd0 100644
--- a/go/weed/upload.go
+++ b/go/weed/upload.go
@@ -12,6 +12,7 @@ var (
uploadReplication *string
uploadCollection *string
uploadDir *string
+ uploadTtl *string
include *string
maxMB *int
)
@@ -24,6 +25,7 @@ func init() {
include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
uploadReplication = cmdUpload.Flag.String("replication", "", "replication type")
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
+ uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
}
@@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
+ results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB)
+ results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go
index a547d7462..49e84378c 100644
--- a/go/weed/weed_server/common.go
+++ b/go/weed/weed_server/common.go
@@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r)
+ fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, pe)
return
}
debug("assigning file id for", fname)
- assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"))
+ assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl"))
if ae != nil {
writeJsonError(w, r, ae)
return
diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go
index 0f83352a9..f760030f3 100644
--- a/go/weed/weed_server/filer_server_handlers.go
+++ b/go/weed/weed_server/filer_server_handlers.go
@@ -109,7 +109,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
- assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection)
+ assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl"))
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
writeJsonError(w, r, ae)
@@ -131,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
resp, do_err := util.Do(request)
if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
+ glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
writeJsonError(w, r, do_err)
return
}
defer resp.Body.Close()
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
- glog.V(0).Infoln("failing to upload to volume server", ra_err.Error())
+ glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
writeJsonError(w, r, ra_err)
return
}
@@ -146,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
var ret operation.UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", string(resp_body))
+ glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
writeJsonError(w, r, unmarshal_err)
return
}
if ret.Error != "" {
- glog.V(0).Infoln("failing to post to volume server", ret.Error)
+ glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
writeJsonError(w, r, errors.New(ret.Error))
return
}
@@ -169,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
- glog.V(0).Infoln("failing to write to filer server", db_err.Error())
+ glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
writeJsonError(w, r, db_err)
return
}
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index d50075fd5..1458bf3e6 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
}
- ms.Topo.RegisterVolumes(joinMessage)
+ ms.Topo.ProcessJoinMessage(joinMessage)
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}
@@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
+ ttl, err := storage.ReadTTL(r.FormValue("ttl"))
+ if err != nil {
+ return nil, err
+ }
volumeGrowOption := &topology.VolumeGrowOption{
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
+ Ttl: ttl,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
DataNode: r.FormValue("dataNode"),
diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go
index 6d285524a..1c01fdfd0 100644
--- a/go/weed/weed_server/volume_server_handlers_admin.go
+++ b/go/weed/weed_server/volume_server_handlers_admin.go
@@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
- err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"))
+ err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
writeJsonQuiet(w, r, map[string]string{"error": ""})
} else {