diff options
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/cdb_map.go | 2 | ||||
| -rw-r--r-- | go/storage/cdb_map_test.go | 2 | ||||
| -rw-r--r-- | go/storage/compact_map_perf_test.go | 4 | ||||
| -rw-r--r-- | go/storage/compress.go | 2 | ||||
| -rw-r--r-- | go/storage/crc.go | 4 | ||||
| -rw-r--r-- | go/storage/file_id.go | 4 | ||||
| -rw-r--r-- | go/storage/needle.go | 15 | ||||
| -rw-r--r-- | go/storage/needle_map.go | 4 | ||||
| -rw-r--r-- | go/storage/needle_read_write.go | 25 | ||||
| -rw-r--r-- | go/storage/replica_placement.go | 8 | ||||
| -rw-r--r-- | go/storage/store.go | 73 | ||||
| -rw-r--r-- | go/storage/store_vacuum.go | 2 | ||||
| -rw-r--r-- | go/storage/volume.go | 132 | ||||
| -rw-r--r-- | go/storage/volume_info.go | 4 | ||||
| -rw-r--r-- | go/storage/volume_super_block.go | 75 | ||||
| -rw-r--r-- | go/storage/volume_super_block_test.go | 23 | ||||
| -rw-r--r-- | go/storage/volume_ttl.go | 135 | ||||
| -rw-r--r-- | go/storage/volume_ttl_test.go | 60 | ||||
| -rw-r--r-- | go/storage/volume_vacuum.go | 16 |
19 files changed, 472 insertions, 118 deletions
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go index d09a87e2a..1869a563e 100644 --- a/go/storage/cdb_map.go +++ b/go/storage/cdb_map.go @@ -1,7 +1,7 @@ package storage import ( - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "fmt" diff --git a/go/storage/cdb_map_test.go b/go/storage/cdb_map_test.go index f6a7d42ad..cff7dfa61 100644 --- a/go/storage/cdb_map_test.go +++ b/go/storage/cdb_map_test.go @@ -1,7 +1,7 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/glog" "math/rand" "os" "runtime" diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go index 37b23a59f..ef43de25b 100644 --- a/go/storage/compact_map_perf_test.go +++ b/go/storage/compact_map_perf_test.go @@ -1,8 +1,8 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" "log" "os" "testing" diff --git a/go/storage/compress.go b/go/storage/compress.go index 846fd0714..a353c9d3a 100644 --- a/go/storage/compress.go +++ b/go/storage/compress.go @@ -2,7 +2,7 @@ package storage import ( "bytes" - "code.google.com/p/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/glog" "compress/flate" "compress/gzip" "io/ioutil" diff --git a/go/storage/crc.go b/go/storage/crc.go index 41f7f6d00..7aa400959 100644 --- a/go/storage/crc.go +++ b/go/storage/crc.go @@ -1,7 +1,7 @@ package storage import ( - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/util" "fmt" "hash/crc32" ) @@ -25,5 +25,5 @@ func (c CRC) Value() uint32 { func (n *Needle) Etag() string { bits := make([]byte, 4) util.Uint32toBytes(bits, uint32(n.Checksum)) - return fmt.Sprintf("%x", bits) + return fmt.Sprintf("\"%x\"", bits) } diff --git a/go/storage/file_id.go b/go/storage/file_id.go index 5fcd8c387..ec566826c 100644 --- a/go/storage/file_id.go +++ b/go/storage/file_id.go @@ -1,8 +1,8 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" "encoding/hex" "errors" "strings" diff --git a/go/storage/needle.go b/go/storage/needle.go index 77aa70169..daede321b 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -1,9 +1,9 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/images" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/images" + "github.com/chrislusf/weed-fs/go/util" "encoding/hex" "errors" "io/ioutil" @@ -38,12 +38,13 @@ type Needle struct { MimeSize uint8 //version2 Mime []byte `comment:"maximum 256 characters"` //version2 LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk + Ttl *TTL Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` } -func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) { +func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) { form, fe := r.MultipartReader() if fe != nil { glog.V(0).Infoln("MultipartReader [ERROR]", fe) @@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string fileName = fileName[:len(fileName)-3] } modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + ttl, _ = ReadTTL(r.FormValue("ttl")) return } func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { fname, mimeType, isGzipped := "", "", false n = new(Needle) - fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r) + fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r) if e != nil { return } @@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { n.LastModified = uint64(time.Now().Unix()) } n.SetHasLastModifiedDate() + if n.Ttl != EMPTY_TTL { + n.SetHasTtl() + } if fixJpgOrientation { loweredName := strings.ToLower(fname) diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 6d94ee1ca..dca2e6c5d 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -1,8 +1,8 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "os" diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 835d7c270..bf452ba37 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -1,8 +1,8 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" "errors" "fmt" "io" @@ -14,7 +14,9 @@ const ( FlagHasName = 0x02 FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 + FlagHasTtl = 0x10 LastModifiedBytesLength = 5 + TtlBytesLength = 2 ) func (n *Needle) DiskSize() int64 { @@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { if n.HasLastModifiedDate() { n.Size = n.Size + LastModifiedBytesLength } + if n.HasTtl() { + n.Size = n.Size + TtlBytesLength + } } size = n.DataSize util.Uint32toBytes(header[12:16], n.Size) @@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return } } + if n.HasTtl() { + n.Ttl.ToBytes(header[0:TtlBytesLength]) + if _, err = w.Write(header[0:TtlBytesLength]); err != nil { + return + } + } } padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) @@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength } + if index < lenBytes && n.HasTtl() { + n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) + index = index + TtlBytesLength + } } func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { @@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool { func (n *Needle) SetHasLastModifiedDate() { n.Flags = n.Flags | FlagHasLastModifiedDate } +func (n *Needle) HasTtl() bool { + return n.Flags&FlagHasTtl > 0 +} +func (n *Needle) SetHasTtl() { + n.Flags = n.Flags | FlagHasTtl +} diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index 696888cd8..c1aca52eb 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -5,10 +5,6 @@ import ( "fmt" ) -const ( - ReplicaPlacementCount = 9 -) - type ReplicaPlacement struct { SameRackCount int DiffRackCount int @@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } - -func (rp *ReplicaPlacement) GetReplicationLevelIndex() int { - return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount -} diff --git a/go/storage/store.go b/go/storage/store.go index a6a4f399e..e7a9dac94 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -2,9 +2,9 @@ package storage import ( proto "code.google.com/p/goprotobuf/proto" - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "fmt" @@ -14,6 +14,10 @@ import ( "strings" ) +const ( + MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes +) + type DiskLocation struct { Directory string MaxVolumeCount int @@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error { +func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } + ttl, e := ReadTTL(ttlString) + if e != nil { + return e + } for _, range_string := range strings.Split(volumeListString, ",") { if strings.Index(range_string, "-") < 0 { id_string := range_string @@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt) + e = s.addVolume(VolumeId(id), collection, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt); err != nil { + if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { e = err } } @@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) { } return } +func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) { + e = v.Destroy() + if e != nil { + return + } + delete(volumes, v.Id) + return +} func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { if v, found := location.volumes[vid]; found { @@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error { +func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", + location.Directory, vid, collection, replicaPlacement, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size()) + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } } } @@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) { for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), - Version: proto.Uint32(uint32(v.Version())), - } - volumeMessages = append(volumeMessages, volumeMessage) if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } + if !v.expired(s.volumeSizeLimit) { + volumeMessage := &operation.VolumeInformationMessage{ + Id: proto.Uint32(uint32(k)), + Size: proto.Uint64(uint64(v.Size())), + Collection: proto.String(v.Collection), + FileCount: proto.Uint64(uint64(v.nm.FileCount())), + DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), + DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), + ReadOnly: proto.Bool(v.readOnly), + ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), + Version: proto.Uint32(uint32(v.Version())), + Ttl: proto.Uint32(v.Ttl.ToUint32()), + } + volumeMessages = append(volumeMessages, volumeMessage) + } else { + if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + s.DeleteVolume(location.volumes, v) + glog.V(0).Infoln("volume", v.Id, "is deleted.") + } else { + glog.V(0).Infoln("volume", v.Id, "is expired.") + } + } } } diff --git a/go/storage/store_vacuum.go b/go/storage/store_vacuum.go index 5adaa7561..3527e4f59 100644 --- a/go/storage/store_vacuum.go +++ b/go/storage/store_vacuum.go @@ -1,7 +1,7 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/glog" "fmt" "strconv" ) diff --git a/go/storage/volume.go b/go/storage/volume.go index 7bd8e7467..de79e9107 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -2,7 +2,7 @@ package storage import ( "bytes" - "code.google.com/p/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/glog" "errors" "fmt" "io" @@ -12,22 +12,6 @@ import ( "time" ) -const ( - SuperBlockSize = 8 -) - -type SuperBlock struct { - Version Version - ReplicaPlacement *ReplicaPlacement -} - -func (s *SuperBlock) Bytes() []byte { - header := make([]byte, SuperBlockSize) - header[0] = byte(s.Version) - header[1] = s.ReplicaPlacement.Byte() - return header -} - type Volume struct { Id VolumeId dir string @@ -38,12 +22,13 @@ type Volume struct { SuperBlock - accessLock sync.Mutex + accessLock sync.Mutex + lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} e = v.load(true, true) return } @@ -65,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { var e error fileName := v.FileName() - if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists { + if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } if canWrite { v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.lastModifiedTime = uint64(modifiedTime.Unix()) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") v.dataFile, e = os.Open(fileName + ".dat") @@ -122,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return e } func (v *Volume) Version() Version { - return v.SuperBlock.Version + return v.SuperBlock.Version() } func (v *Volume) Size() int64 { stat, e := v.dataFile.Stat() @@ -138,44 +124,6 @@ func (v *Volume) Close() { v.nm.Close() _ = v.dataFile.Close() } -func (v *Volume) maybeWriteSuperBlock() error { - stat, e := v.dataFile.Stat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error()) - return e - } - if stat.Size() == 0 { - v.SuperBlock.Version = CurrentVersion - _, e = v.dataFile.Write(v.SuperBlock.Bytes()) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { - if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { - v.readOnly = false - } - } - } - } - return e -} -func (v *Volume) readSuperBlock() (err error) { - if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error()) - } - header := make([]byte, SuperBlockSize) - if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e.Error()) - } - v.SuperBlock, err = ParseSuperBlock(header) - return err -} -func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { - superBlock.Version = Version(header[0]) - if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err.Error()) - } - return -} func (v *Volume) NeedToReplicate() bool { return v.ReplicaPlacement.GetCopyCount() > 1 } @@ -246,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error()) } } + if v.lastModifiedTime < n.LastModified { + v.lastModifiedTime = n.LastModified + } return } @@ -275,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) { func (v *Volume) read(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if !ok || nv.Offset == 0 { + return -1, errors.New("Not Found") + } + bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + return bytesRead, err + } + if !n.HasTtl() { + return bytesRead, err + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { + return bytesRead, nil } return -1, errors.New("Not Found") } @@ -397,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { } return true } + +// volume is expired if modified time + volume ttl < now +// except when volume is empty +// or when the volume does not have a ttl +// or when volumeSizeLimit is 0 when server just starts +func (v *Volume) expired(volumeSizeLimit uint64) bool { + if volumeSizeLimit == 0 { + //skip if we don't know size limit + return false + } + if v.ContentSize() == 0 { + return false + } + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 + glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + if int64(v.Ttl.Minutes()) < livedMinutes { + return true + } + return false +} + +// wait either maxDelayMinutes or 10% of ttl minutes +func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + removalDelay := v.Ttl.Minutes() / 10 + if removalDelay > maxDelayMinutes { + removalDelay = maxDelayMinutes + } + + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + return true + } + return false +} diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 165af1a19..6410c1784 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -1,13 +1,14 @@ package storage import ( - "code.google.com/p/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/operation" ) type VolumeInfo struct { Id VolumeId Size uint64 ReplicaPlacement *ReplicaPlacement + Ttl *TTL Collection string Version Version FileCount int @@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er return vi, e } vi.ReplicaPlacement = rp + vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go new file mode 100644 index 000000000..a7e86b1c3 --- /dev/null +++ b/go/storage/volume_super_block.go @@ -0,0 +1,75 @@ +package storage + +import ( + "github.com/chrislusf/weed-fs/go/glog" + "fmt" + "os" +) + +const ( + SuperBlockSize = 8 +) + +/* +* Super block currently has 8 bytes allocated for each volume. +* Byte 0: version, 1 or 2 +* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc +* Byte 2 and byte 3: Time to live. See TTL for definition +* Rest bytes: Reserved + */ +type SuperBlock struct { + version Version + ReplicaPlacement *ReplicaPlacement + Ttl *TTL +} + +func (s *SuperBlock) Version() Version { + return s.version +} +func (s *SuperBlock) Bytes() []byte { + header := make([]byte, SuperBlockSize) + header[0] = byte(s.version) + header[1] = s.ReplicaPlacement.Byte() + s.Ttl.ToBytes(header[2:4]) + return header +} + +func (v *Volume) maybeWriteSuperBlock() error { + stat, e := v.dataFile.Stat() + if e != nil { + glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error()) + return e + } + if stat.Size() == 0 { + v.SuperBlock.version = CurrentVersion + _, e = v.dataFile.Write(v.SuperBlock.Bytes()) + if e != nil && os.IsPermission(e) { + //read-only, but zero length - recreate it! + if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { + if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { + v.readOnly = false + } + } + } + } + return e +} +func (v *Volume) readSuperBlock() (err error) { + if _, err = v.dataFile.Seek(0, 0); err != nil { + return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error()) + } + header := make([]byte, SuperBlockSize) + if _, e := v.dataFile.Read(header); e != nil { + return fmt.Errorf("cannot read superblock: %s", e.Error()) + } + v.SuperBlock, err = ParseSuperBlock(header) + return err +} +func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { + superBlock.version = Version(header[0]) + if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { + err = fmt.Errorf("cannot read replica type: %s", err.Error()) + } + superBlock.Ttl = LoadTTLFromBytes(header[2:4]) + return +} diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go new file mode 100644 index 000000000..13db4b194 --- /dev/null +++ b/go/storage/volume_super_block_test.go @@ -0,0 +1,23 @@ +package storage + +import ( + "testing" +) + +func TestSuperBlockReadWrite(t *testing.T) { + rp, _ := NewReplicaPlacementFromByte(byte(001)) + ttl, _ := ReadTTL("15d") + s := &SuperBlock{ + version: CurrentVersion, + ReplicaPlacement: rp, + Ttl: ttl, + } + + bytes := s.Bytes() + + if !(bytes[2] == 15 && bytes[3] == Day) { + println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) + t.Fail() + } + +} diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go new file mode 100644 index 000000000..459ee55ba --- /dev/null +++ b/go/storage/volume_ttl.go @@ -0,0 +1,135 @@ +package storage + +import ( + "strconv" +) + +const ( + //stored unit types + Empty byte = iota + Minute + Hour + Day + Week + Month + Year +) + +type TTL struct { + count byte + unit byte +} + +var EMPTY_TTL = &TTL{} + +// translate a readable ttl to internal ttl +// Supports format example: +// 3m: 3 minutes +// 4h: 4 hours +// 5d: 5 days +// 6w: 6 weeks +// 7M: 7 months +// 8y: 8 years +func ReadTTL(ttlString string) (*TTL, error) { + if ttlString == "" { + return EMPTY_TTL, nil + } + ttlBytes := []byte(ttlString) + unitByte := ttlBytes[len(ttlBytes)-1] + countBytes := ttlBytes[0 : len(ttlBytes)-1] + if '0' <= unitByte && unitByte <= '9' { + countBytes = ttlBytes + unitByte = 'm' + } + count, err := strconv.Atoi(string(countBytes)) + unit := toStoredByte(unitByte) + return &TTL{count: byte(count), unit: unit}, err +} + +// read stored bytes to a ttl +func LoadTTLFromBytes(input []byte) (t *TTL) { + return &TTL{count: input[0], unit: input[1]} +} + +// read stored bytes to a ttl +func LoadTTLFromUint32(ttl uint32) (t *TTL) { + input := make([]byte, 2) + input[1] = byte(ttl) + input[0] = byte(ttl >> 8) + return LoadTTLFromBytes(input) +} + +// save stored bytes to an output with 2 bytes +func (t TTL) ToBytes(output []byte) { + output[0] = t.count + output[1] = t.unit +} + +func (t TTL) ToUint32() (output uint32) { + output = uint32(t.count) << 8 + output += uint32(t.unit) + return output +} + +func (t TTL) String() string { + if t.count == 0 { + return "" + } + if t.unit == Empty { + return "" + } + countString := strconv.Itoa(int(t.count)) + switch t.unit { + case Minute: + return countString + "m" + case Hour: + return countString + "h" + case Day: + return countString + "d" + case Week: + return countString + "w" + case Month: + return countString + "M" + case Year: + return countString + "y" + } + return "" +} + +func toStoredByte(readableUnitByte byte) byte { + switch readableUnitByte { + case 'm': + return Minute + case 'h': + return Hour + case 'd': + return Day + case 'w': + return Week + case 'M': + return Month + case 'y': + return Year + } + return 0 +} + +func (t TTL) Minutes() uint32 { + switch t.unit { + case Empty: + return 0 + case Minute: + return uint32(t.count) + case Hour: + return uint32(t.count) * 60 + case Day: + return uint32(t.count) * 60 * 24 + case Week: + return uint32(t.count) * 60 * 24 * 7 + case Month: + return uint32(t.count) * 60 * 24 * 31 + case Year: + return uint32(t.count) * 60 * 24 * 365 + } + return 0 +} diff --git a/go/storage/volume_ttl_test.go b/go/storage/volume_ttl_test.go new file mode 100644 index 000000000..216469a4c --- /dev/null +++ b/go/storage/volume_ttl_test.go @@ -0,0 +1,60 @@ +package storage + +import ( + "testing" +) + +func TestTTLReadWrite(t *testing.T) { + ttl, _ := ReadTTL("") + if ttl.Minutes() != 0 { + t.Errorf("empty ttl:%v", ttl) + } + + ttl, _ = ReadTTL("9") + if ttl.Minutes() != 9 { + t.Errorf("9 ttl:%v", ttl) + } + + ttl, _ = ReadTTL("8m") + if ttl.Minutes() != 8 { + t.Errorf("8m ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5h") + if ttl.Minutes() != 300 { + t.Errorf("5h ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5d") + if ttl.Minutes() != 5*24*60 { + t.Errorf("5d ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5w") + if ttl.Minutes() != 5*7*24*60 { + t.Errorf("5w ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5M") + if ttl.Minutes() != 5*31*24*60 { + t.Errorf("5M ttl:%v", ttl) + } + + ttl, _ = ReadTTL("5y") + if ttl.Minutes() != 5*365*24*60 { + t.Errorf("5y ttl:%v", ttl) + } + + output := make([]byte, 2) + ttl.ToBytes(output) + ttl2 := LoadTTLFromBytes(output) + if ttl.Minutes() != ttl2.Minutes() { + t.Errorf("ttl:%v ttl2:%v", ttl, ttl2) + } + + ttl3 := LoadTTLFromUint32(ttl.ToUint32()) + if ttl.Minutes() != ttl3.Minutes() { + t.Errorf("ttl:%v ttl3:%v", ttl, ttl3) + } + +} diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7d2a38cb8..b348434d2 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -1,10 +1,10 @@ package storage import ( - "code.google.com/p/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/glog" "fmt" "os" - _ "time" + "time" ) func (v *Volume) garbageLevel() float64 { @@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 { func (v *Volume) Compact() error { glog.V(3).Infof("Compacting ...") - v.accessLock.Lock() - defer v.accessLock.Unlock() - glog.V(3).Infof("Got Compaction lock...") + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() glog.V(3).Infof("creating copies for volume %d ...", v.Id) @@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nm := NewNeedleMap(idx) new_offset := int64(SuperBlockSize) + now := uint64(time.Now().Unix()) + err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { _, err = dst.Write(superBlock.Bytes()) return err }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { |
