aboutsummaryrefslogtreecommitdiff
path: root/go/storage/volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage/volume.go')
-rw-r--r--go/storage/volume.go66
1 files changed, 39 insertions, 27 deletions
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 01d37285d..c212e9b37 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -2,10 +2,10 @@ package storage
import (
"bytes"
+ "code.google.com/p/weed-fs/go/glog"
"errors"
"fmt"
"io"
- "log"
"os"
"path"
"sync"
@@ -45,7 +45,7 @@ func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v
e = v.load(true)
return
}
-func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) {
+func loadVolumeWithoutIndex(dirname string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Id: id}
v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
e = v.load(false)
@@ -57,12 +57,12 @@ func (v *Volume) load(alsoLoadIndex bool) error {
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
if !os.IsPermission(e) {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e.Error())
}
if v.dataFile, e = os.Open(fileName + ".dat"); e != nil {
- return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e)
+ return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e.Error())
}
- log.Printf("opening " + fileName + ".dat in READONLY mode")
+ glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
v.readOnly = true
}
if v.ReplicaType == CopyNil {
@@ -73,28 +73,40 @@ func (v *Volume) load(alsoLoadIndex bool) error {
if e == nil && alsoLoadIndex {
var indexFile *os.File
if v.readOnly {
+ glog.V(4).Infoln("opening file", fileName+".idx")
if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) {
- return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e)
+ return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e.Error())
}
if indexFile != nil {
- log.Printf("converting %s.idx to %s.cdb", fileName, fileName)
- if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
- log.Printf("error converting %s.idx to %s.cdb: %s", fileName, fileName)
- } else {
- indexFile.Close()
- os.Remove(indexFile.Name())
- indexFile = nil
+ glog.V(4).Infoln("check file", fileName+".cdb")
+ if _, err := os.Stat(fileName + ".cdb"); os.IsNotExist(err) {
+ glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName)
+ if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
+ glog.V(0).Infof("error converting %s.idx to %s.cdb: %s", fileName, e.Error())
+ } else {
+ indexFile.Close()
+ indexFile = nil
+ }
}
}
- v.nm, e = OpenCdbMap(fileName + ".cdb")
- return e
- } else {
- indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
- if e != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ glog.V(4).Infoln("open file", fileName+".cdb")
+ if v.nm, e = OpenCdbMap(fileName + ".cdb"); e != nil {
+ if os.IsNotExist(e) {
+ glog.V(0).Infof("Failed to read cdb file :%s, fall back to normal readonly mode.", fileName)
+ } else {
+ glog.V(0).Infof("%s.cdb open errro:%s", fileName, e.Error())
+ return e
+ }
}
}
+ glog.V(4).Infoln("open to write file", fileName+".idx")
+ indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
+ if e != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e.Error())
+ }
+ glog.V(4).Infoln("loading file", fileName+".idx")
v.nm, e = LoadNeedleMap(indexFile)
+ glog.V(4).Infoln("loading error:", e)
}
return e
}
@@ -108,7 +120,7 @@ func (v *Volume) Size() int64 {
if e == nil {
return stat.Size()
}
- log.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
+ glog.V(0).Infof("Failed to read file size %s %s", v.dataFile.Name(), e.Error())
return -1
}
func (v *Volume) Close() {
@@ -120,7 +132,7 @@ func (v *Volume) Close() {
func (v *Volume) maybeWriteSuperBlock() error {
stat, e := v.dataFile.Stat()
if e != nil {
- log.Printf("failed to stat datafile %s: %s", v.dataFile, e)
+ glog.V(0).Infof("failed to stat datafile %s: %s", v.dataFile, e.Error())
return e
}
if stat.Size() == 0 {
@@ -221,10 +233,10 @@ func (v *Volume) delete(n *Needle) (uint32, error) {
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok {
size := nv.Size
- if err:= v.nm.Delete(n.Id); err != nil {
+ if err := v.nm.Delete(n.Id); err != nil {
return size, err
}
- if _, err:= v.dataFile.Seek(0, 2); err != nil {
+ if _, err := v.dataFile.Seek(0, 2); err != nil {
return size, err
}
n.Data = make([]byte, 0)
@@ -286,7 +298,7 @@ func (v *Volume) freeze() error {
defer v.accessLock.Unlock()
bn, _ := nakeFilename(v.dataFile.Name())
cdbFn := bn + ".cdb"
- log.Printf("converting %s to %s", nm.indexFile.Name(), cdbFn)
+ glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn)
err := DumpNeedleMapToCdb(cdbFn, nm)
if err != nil {
return err
@@ -304,7 +316,7 @@ func ScanVolumeFile(dirname string, id VolumeId,
visitSuperBlock func(SuperBlock) error,
visitNeedle func(n *Needle, offset uint32) error) (err error) {
var v *Volume
- if v, err = LoadVolumeOnly(dirname, id); err != nil {
+ if v, err = loadVolumeWithoutIndex(dirname, id); err != nil {
return
}
if err = visitSuperBlock(v.SuperBlock); err != nil {
@@ -361,7 +373,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
return err
}, func(n *Needle, offset uint32) error {
nv, ok := v.nm.Get(n.Id)
- //log.Println("file size is", n.Size, "rest", rest)
+ //glog.V(0).Infoln("file size is", n.Size, "rest", rest)
if ok && nv.Offset*NeedlePaddingSize == offset {
if nv.Size > 0 {
if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
@@ -371,7 +383,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize()
- //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
+ //glog.V(0).Infoln("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
}
}
return nil