diff options
Diffstat (limited to 'go/storage/volume.go')
| -rw-r--r-- | go/storage/volume.go | 132 |
1 files changed, 70 insertions, 62 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go index 7bd8e7467..de79e9107 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -2,7 +2,7 @@ package storage import ( "bytes" - "code.google.com/p/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/glog" "errors" "fmt" "io" @@ -12,22 +12,6 @@ import ( "time" ) -const ( - SuperBlockSize = 8 -) - -type SuperBlock struct { - Version Version - ReplicaPlacement *ReplicaPlacement -} - -func (s *SuperBlock) Bytes() []byte { - header := make([]byte, SuperBlockSize) - header[0] = byte(s.Version) - header[1] = s.ReplicaPlacement.Byte() - return header -} - type Volume struct { Id VolumeId dir string @@ -38,12 +22,13 @@ type Volume struct { SuperBlock - accessLock sync.Mutex + accessLock sync.Mutex + lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} e = v.load(true, true) return } @@ -65,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { var e error fileName := v.FileName() - if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists { + if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } if canWrite { v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.lastModifiedTime = uint64(modifiedTime.Unix()) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") v.dataFile, e = os.Open(fileName + ".dat") @@ -122,7 +108,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return e } func (v *Volume) Version() Version { - return v.SuperBlock.Version + return v.SuperBlock.Version() } func (v *Volume) Size() int64 { stat, e := v.dataFile.Stat() @@ -138,44 +124,6 @@ func (v *Volume) Close() { v.nm.Close() _ = v.dataFile.Close() } -func (v *Volume) maybeWriteSuperBlock() error { - stat, e := v.dataFile.Stat() - if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error()) - return e - } - if stat.Size() == 0 { - v.SuperBlock.Version = CurrentVersion - _, e = v.dataFile.Write(v.SuperBlock.Bytes()) - if e != nil && os.IsPermission(e) { - //read-only, but zero length - recreate it! - if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil { - if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil { - v.readOnly = false - } - } - } - } - return e -} -func (v *Volume) readSuperBlock() (err error) { - if _, err = v.dataFile.Seek(0, 0); err != nil { - return fmt.Errorf("cannot seek to the beginning of %s: %s", v.dataFile.Name(), err.Error()) - } - header := make([]byte, SuperBlockSize) - if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e.Error()) - } - v.SuperBlock, err = ParseSuperBlock(header) - return err -} -func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { - superBlock.Version = Version(header[0]) - if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err.Error()) - } - return -} func (v *Volume) NeedToReplicate() bool { return v.ReplicaPlacement.GetCopyCount() > 1 } @@ -246,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error()) } } + if v.lastModifiedTime < n.LastModified { + v.lastModifiedTime = n.LastModified + } return } @@ -275,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) { func (v *Volume) read(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if !ok || nv.Offset == 0 { + return -1, errors.New("Not Found") + } + bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + return bytesRead, err + } + if !n.HasTtl() { + return bytesRead, err + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { + return bytesRead, nil } return -1, errors.New("Not Found") } @@ -397,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { } return true } + +// volume is expired if modified time + volume ttl < now +// except when volume is empty +// or when the volume does not have a ttl +// or when volumeSizeLimit is 0 when server just starts +func (v *Volume) expired(volumeSizeLimit uint64) bool { + if volumeSizeLimit == 0 { + //skip if we don't know size limit + return false + } + if v.ContentSize() == 0 { + return false + } + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 + glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + if int64(v.Ttl.Minutes()) < livedMinutes { + return true + } + return false +} + +// wait either maxDelayMinutes or 10% of ttl minutes +func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + removalDelay := v.Ttl.Minutes() / 10 + if removalDelay > maxDelayMinutes { + removalDelay = maxDelayMinutes + } + + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + return true + } + return false +} |
