aboutsummaryrefslogtreecommitdiff
path: root/go/storage/volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage/volume.go')
-rw-r--r--go/storage/volume.go83
1 files changed, 15 insertions, 68 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 1988c9aac..2b47fb497 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -27,10 +27,10 @@ type Volume struct {
lastModifiedTime uint64 //unix time in seconds
}
-func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
- e = v.load(true, true)
+ e = v.load(true, true, useLevelDb)
return
}
func (v *Volume) String() string {
@@ -40,7 +40,7 @@ func (v *Volume) String() string {
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = SuperBlock{}
- e = v.load(false, false)
+ e = v.load(false, false, false)
return
}
func (v *Volume) FileName() (fileName string) {
@@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) {
}
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error {
var e error
fileName := v.FileName()
@@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
e = v.maybeWriteSuperBlock()
}
if e == nil && alsoLoadIndex {
- if v.readOnly {
- if v.ensureConvertIdxToCdb(fileName) {
- v.nm, e = OpenCdbMap(fileName + ".cdb")
- return e
- }
- }
var indexFile *os.File
if v.readOnly {
glog.V(1).Infoln("open to read file", fileName+".idx")
@@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
}
}
- glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly)
- if v.nm, e = LoadNeedleMap(indexFile); e != nil {
- glog.V(0).Infoln("loading error:", e)
+ if !useLevelDb {
+ glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly)
+ if v.nm, e = LoadNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e)
+ }
+ } else {
+ glog.V(0).Infoln("loading leveldb file", fileName+".ldb")
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
}
}
return e
@@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
}
nv, ok := v.nm.Get(n.Id)
if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
- if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
+ if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
}
@@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) {
return -1, errors.New("Not Found")
}
-func (v *Volume) freeze() error {
- if v.readOnly {
- return nil
- }
- nm, ok := v.nm.(*NeedleMap)
- if !ok {
- return nil
- }
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- bn, _ := baseFilename(v.dataFile.Name())
- cdbFn := bn + ".cdb"
- glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
- err := DumpNeedleMapToCdb(cdbFn, nm)
- if err != nil {
- return err
- }
- if v.nm, err = OpenCdbMap(cdbFn); err != nil {
- return err
- }
- nm.indexFile.Close()
- os.Remove(nm.indexFile.Name())
- v.readOnly = true
- return nil
-}
-
func ScanVolumeFile(dirname string, collection string, id VolumeId,
visitSuperBlock func(SuperBlock) error,
readNeedleBody bool,
@@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti
modTime = fi.ModTime()
return
}
-func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) {
- var indexFile *os.File
- var e error
- _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb")
- _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx")
- if cdbCanRead && cdbModTime.After(idxModeTime) {
- return true
- }
- if !cdbCanWrite {
- return false
- }
- if !idxCanRead {
- glog.V(0).Infoln("Can not read file", fileName+".idx!")
- return false
- }
- glog.V(2).Infoln("opening file", fileName+".idx")
- if indexFile, e = os.Open(fileName + ".idx"); e != nil {
- glog.V(0).Infoln("Failed to read file", fileName+".idx !")
- return false
- }
- defer indexFile.Close()
- glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
- if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
- glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e)
- return false
- }
- return true
-}
// volume is expired if modified time + volume ttl < now
// except when volume is empty