diff options
Diffstat (limited to 'go/storage/volume.go')
| -rw-r--r-- | go/storage/volume.go | 83 |
1 files changed, 15 insertions, 68 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go index 1988c9aac..2b47fb497 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -27,10 +27,10 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true) + e = v.load(true, true, useLevelDb) return } func (v *Volume) String() string { @@ -40,7 +40,7 @@ func (v *Volume) String() string { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false) + e = v.load(false, false, false) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { var e error fileName := v.FileName() @@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - if v.readOnly { - if v.ensureConvertIdxToCdb(fileName) { - v.nm, e = OpenCdbMap(fileName + ".cdb") - return e - } - } var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") @@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infoln("loading error:", e) + if !useLevelDb { + glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) + if v.nm, e = LoadNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + } + } else { + glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } } } return e @@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) { return -1, errors.New("Not Found") } -func (v *Volume) freeze() error { - if v.readOnly { - return nil - } - nm, ok := v.nm.(*NeedleMap) - if !ok { - return nil - } - v.accessLock.Lock() - defer v.accessLock.Unlock() - bn, _ := baseFilename(v.dataFile.Name()) - cdbFn := bn + ".cdb" - glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn) - err := DumpNeedleMapToCdb(cdbFn, nm) - if err != nil { - return err - } - if v.nm, err = OpenCdbMap(cdbFn); err != nil { - return err - } - nm.indexFile.Close() - os.Remove(nm.indexFile.Name()) - v.readOnly = true - return nil -} - func ScanVolumeFile(dirname string, collection string, id VolumeId, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, @@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti modTime = fi.ModTime() return } -func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { - var indexFile *os.File - var e error - _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") - _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") - if cdbCanRead && cdbModTime.After(idxModeTime) { - return true - } - if !cdbCanWrite { - return false - } - if !idxCanRead { - glog.V(0).Infoln("Can not read file", fileName+".idx!") - return false - } - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil { - glog.V(0).Infoln("Failed to read file", fileName+".idx !") - return false - } - defer indexFile.Close() - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e) - return false - } - return true -} // volume is expired if modified time + volume ttl < now // except when volume is empty |
