diff options
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/needle_map.go | 158 | ||||
| -rw-r--r-- | go/storage/needle_map_leveldb.go | 151 | ||||
| -rw-r--r-- | go/storage/needle_map_memory.go | 126 | ||||
| -rw-r--r-- | go/storage/store.go | 40 | ||||
| -rw-r--r-- | go/storage/volume.go | 83 | ||||
| -rw-r--r-- | go/storage/volume_vacuum.go | 4 |
6 files changed, 330 insertions, 232 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index f74191742..0bfa12180 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -2,15 +2,13 @@ package storage import ( "fmt" - "io" "os" - "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/util" ) type NeedleMapper interface { - Put(key uint64, offset uint32, size uint32) (int, error) + Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) Delete(key uint64) error Close() @@ -19,7 +17,6 @@ type NeedleMapper interface { DeletedSize() uint64 FileCount() int DeletedCount() int - Visit(visit func(NeedleValue) error) (err error) MaxFileKey() uint64 } @@ -31,146 +28,33 @@ type mapMetric struct { MaximumFileKey uint64 `json:"MaxFileKey"` } -type NeedleMap struct { - indexFile *os.File - m CompactMap - - mapMetric -} - -func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: NewCompactMap(), - indexFile: file, - } - return nm -} - -const ( - RowsToRead = 1024 -) - -func LoadNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewNeedleMap(file) - e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - return nil - }) - glog.V(1).Infoln("max file key:", nm.MaximumFileKey) - return nm, e -} - -// walks through the index file, calls fn function with each key, offset, size -// stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { - var readerOffset int64 - bytes := make([]byte, 16*RowsToRead) - count, e := r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - var ( - key uint64 - offset, size uint32 - i int - ) - - for count > 0 && e == nil || e == io.EOF { - for i = 0; i+16 <= count; i += 16 { - key = util.BytesToUint64(bytes[i : i+8]) - offset = util.BytesToUint32(bytes[i+8 : i+12]) - size = util.BytesToUint32(bytes[i+12 : i+16]) - if e = fn(key, offset, size); e != nil { - return e - } - } - if e == io.EOF { - return nil - } - count, e = r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) - readerOffset += int64(count) - } - return e -} - -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { - if key > nm.MaximumFileKey { - nm.MaximumFileKey = key - } - oldSize := nm.m.Set(Key(key), offset, size) +func appendToIndexFile(indexFile *os.File, + key uint64, offset uint32, size uint32) error { bytes := make([]byte, 16) util.Uint64toBytes(bytes[0:8], key) util.Uint32toBytes(bytes[8:12], offset) util.Uint32toBytes(bytes[12:16], size) - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return 0, fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) + if _, err := indexFile.Seek(0, 2); err != nil { + return fmt.Errorf("cannot seek end of indexfile %s: %v", + indexFile.Name(), err) } - return nm.indexFile.Write(bytes) + _, err := indexFile.Write(bytes) + return err } -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) - return + +func (mm *mapMetric) logDelete(deletedByteCount uint32) { + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount) + mm.DeletionCounter++ } -func (nm *NeedleMap) Delete(key uint64) error { - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(nm.m.Delete(Key(key))) - bytes := make([]byte, 16) - util.Uint64toBytes(bytes[0:8], key) - util.Uint32toBytes(bytes[8:12], 0) - util.Uint32toBytes(bytes[12:16], 0) - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot go to the end of indexfile %s: %v", nm.indexFile.Name(), err) + +func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) { + if key > mm.MaximumFileKey { + mm.MaximumFileKey = key } - if _, err := nm.indexFile.Write(bytes); err != nil { - return fmt.Errorf("error writing to indexfile %s: %v", nm.indexFile.Name(), err) + mm.FileCounter++ + mm.FileByteCounter = mm.FileByteCounter + uint64(newSize) + if oldSize > 0 { + mm.DeletionCounter++ + mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize) } - nm.DeletionCounter++ - return nil -} -func (nm *NeedleMap) Close() { - _ = nm.indexFile.Close() -} -func (nm *NeedleMap) Destroy() error { - nm.Close() - return os.Remove(nm.indexFile.Name()) -} -func (nm NeedleMap) ContentSize() uint64 { - return nm.FileByteCounter -} -func (nm NeedleMap) DeletedSize() uint64 { - return nm.DeletionByteCounter -} -func (nm NeedleMap) FileCount() int { - return nm.FileCounter -} -func (nm NeedleMap) DeletedCount() int { - return nm.DeletionCounter -} -func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { - return nm.m.Visit(visit) -} -func (nm NeedleMap) MaxFileKey() uint64 { - return nm.MaximumFileKey } diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go new file mode 100644 index 000000000..73595278d --- /dev/null +++ b/go/storage/needle_map_leveldb.go @@ -0,0 +1,151 @@ +package storage + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" + "github.com/syndtr/goleveldb/leveldb" +) + +type LevelDbNeedleMap struct { + dbFileName string + indexFile *os.File + db *leveldb.DB + mapMetric +} + +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { + m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + if !isLevelDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG")) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateDbFile(dbFileName string, indexFile *os.File) error { + db, err := leveldb.OpenFile(dbFileName, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + levelDbWrite(db, key, offset, size) + } else { + levelDbDelete(db, key) + } + return nil + }) +} + +func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + data, err := m.db.Get(bytes, nil) + if err != nil || len(data) != 8 { + glog.V(0).Infof("Failed to get %d %v", key, err) + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *LevelDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return levelDbWrite(m.db, key, offset, size) +} + +func levelDbWrite(db *leveldb.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + if err := db.Put(bytes[0:8], bytes[8:16], nil); err != nil { + return fmt.Errorf("failed to write leveldb: %v", err) + } + return nil +} +func levelDbDelete(db *leveldb.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Delete(bytes, nil) +} + +func (m *LevelDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + return err + } + return levelDbDelete(m.db, key) +} + +func (m *LevelDbNeedleMap) Close() { + m.db.Close() +} + +func (m *LevelDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} + +func (m *LevelDbNeedleMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m *LevelDbNeedleMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m *LevelDbNeedleMap) FileCount() int { + return m.FileCounter +} +func (m *LevelDbNeedleMap) DeletedCount() int { + return m.DeletionCounter +} +func (m *LevelDbNeedleMap) MaxFileKey() uint64 { + return m.MaximumFileKey +} diff --git a/go/storage/needle_map_memory.go b/go/storage/needle_map_memory.go new file mode 100644 index 000000000..5fce301bc --- /dev/null +++ b/go/storage/needle_map_memory.go @@ -0,0 +1,126 @@ +package storage + +import ( + "io" + "os" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" +) + +type NeedleMap struct { + indexFile *os.File + m CompactMap + + mapMetric +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + indexFile: file, + } + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) (*NeedleMap, error) { + nm := NewNeedleMap(file) + e := WalkIndexFile(file, func(key uint64, offset, size uint32) error { + if key > nm.MaximumFileKey { + nm.MaximumFileKey = key + } + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) + } + return nil + }) + glog.V(1).Infoln("max file key:", nm.MaximumFileKey) + return nm, e +} + +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func WalkIndexFile(r *os.File, fn func(key uint64, offset, size uint32) error) error { + var readerOffset int64 + bytes := make([]byte, 16*RowsToRead) + count, e := r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + var ( + key uint64 + offset, size uint32 + i int + ) + + for count > 0 && e == nil || e == io.EOF { + for i = 0; i+16 <= count; i += 16 { + key = util.BytesToUint64(bytes[i : i+8]) + offset = util.BytesToUint32(bytes[i+8 : i+12]) + size = util.BytesToUint32(bytes[i+12 : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if e == io.EOF { + return nil + } + count, e = r.ReadAt(bytes, readerOffset) + glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + readerOffset += int64(count) + } + return e +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) error { + oldSize := nm.m.Set(Key(key), offset, size) + nm.logPut(key, oldSize, size) + return appendToIndexFile(nm.indexFile, key, offset, size) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) error { + deletedBytes := nm.m.Delete(Key(key)) + nm.logDelete(deletedBytes) + return appendToIndexFile(nm.indexFile, key, 0, 0) +} +func (nm *NeedleMap) Close() { + _ = nm.indexFile.Close() +} +func (nm *NeedleMap) Destroy() error { + nm.Close() + return os.Remove(nm.indexFile.Name()) +} +func (nm NeedleMap) ContentSize() uint64 { + return nm.FileByteCounter +} +func (nm NeedleMap) DeletedSize() uint64 { + return nm.DeletionByteCounter +} +func (nm NeedleMap) FileCount() int { + return nm.FileCounter +} +func (nm NeedleMap) DeletedCount() int { + return nm.DeletionCounter +} + +func (nm NeedleMap) MaxFileKey() uint64 { + return nm.MaximumFileKey +} diff --git a/go/storage/store.go b/go/storage/store.go index 2695537f6..65e5b218b 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -90,18 +90,18 @@ func (s *Store) String() (str string) { return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes() + location.loadExistingVolumes(useLevelDb) s.Locations = append(s.Locations, location) } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt, ttl) + e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { e = err } } @@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -195,20 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *Rep return fmt.Errorf("No more free space left") } -func (s *Store) FreezeVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - } - if v := s.findVolume(vid); v != nil { - if v.readOnly { - return fmt.Errorf("Volume %s is already read-only", volumeIdString) - } - return v.freeze() - } - return fmt.Errorf("volume id %d is not found during freeze", vid) -} -func (l *DiskLocation) loadExistingVolumes() { +func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { name := dir.Name() @@ -221,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { l.volumes[vid] = v glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } @@ -261,7 +248,7 @@ func (s *Store) SetRack(rack string) { func (s *Store) SetBootstrapMaster(bootstrapMaster string) { s.masterNodes = NewMasterNodes(bootstrapMaster) } -func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { +func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) { masterNode, e = s.masterNodes.findMaster() if e != nil { return @@ -317,13 +304,16 @@ func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) { return "", "", err } - jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data) + joinUrl := "http://" + masterNode + "/dir/join" + + jsonBlob, err := util.PostBytes(joinUrl, data) if err != nil { s.masterNodes.reset() return "", "", err } var ret operation.JoinResult if err := json.Unmarshal(jsonBlob, &ret); err != nil { + glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob)) return masterNode, "", err } if ret.Error != "" { @@ -354,7 +344,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { } if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) { glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit) - if _, _, e := s.Join(); e != nil { + if _, _, e := s.SendHeartbeatToMaster(); e != nil { glog.V(0).Infoln("error when reporting size:", e) } } diff --git a/go/storage/volume.go b/go/storage/volume.go index 1988c9aac..2b47fb497 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -27,10 +27,10 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true) + e = v.load(true, true, useLevelDb) return } func (v *Volume) String() string { @@ -40,7 +40,7 @@ func (v *Volume) String() string { func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false) + e = v.load(false, false, false) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +51,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { var e error fileName := v.FileName() @@ -87,12 +87,6 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - if v.readOnly { - if v.ensureConvertIdxToCdb(fileName) { - v.nm, e = OpenCdbMap(fileName + ".cdb") - return e - } - } var indexFile *os.File if v.readOnly { glog.V(1).Infoln("open to read file", fileName+".idx") @@ -105,9 +99,16 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - glog.V(0).Infoln("loading file", fileName+".idx", "readonly", v.readOnly) - if v.nm, e = LoadNeedleMap(indexFile); e != nil { - glog.V(0).Infoln("loading error:", e) + if !useLevelDb { + glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) + if v.nm, e = LoadNeedleMap(indexFile); e != nil { + glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) + } + } else { + glog.V(0).Infoln("loading leveldb file", fileName+".ldb") + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } } } return e @@ -202,7 +203,7 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { } nv, ok := v.nm.Get(n.Id) if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - if _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { + if err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -261,32 +262,6 @@ func (v *Volume) read(n *Needle) (int, error) { return -1, errors.New("Not Found") } -func (v *Volume) freeze() error { - if v.readOnly { - return nil - } - nm, ok := v.nm.(*NeedleMap) - if !ok { - return nil - } - v.accessLock.Lock() - defer v.accessLock.Unlock() - bn, _ := baseFilename(v.dataFile.Name()) - cdbFn := bn + ".cdb" - glog.V(0).Infof("converting %s to %s", nm.indexFile.Name(), cdbFn) - err := DumpNeedleMapToCdb(cdbFn, nm) - if err != nil { - return err - } - if v.nm, err = OpenCdbMap(cdbFn); err != nil { - return err - } - nm.indexFile.Close() - os.Remove(nm.indexFile.Name()) - v.readOnly = true - return nil -} - func ScanVolumeFile(dirname string, collection string, id VolumeId, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, @@ -365,34 +340,6 @@ func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti modTime = fi.ModTime() return } -func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { - var indexFile *os.File - var e error - _, cdbCanRead, cdbCanWrite, cdbModTime := checkFile(fileName + ".cdb") - _, idxCanRead, _, idxModeTime := checkFile(fileName + ".idx") - if cdbCanRead && cdbModTime.After(idxModeTime) { - return true - } - if !cdbCanWrite { - return false - } - if !idxCanRead { - glog.V(0).Infoln("Can not read file", fileName+".idx!") - return false - } - glog.V(2).Infoln("opening file", fileName+".idx") - if indexFile, e = os.Open(fileName + ".idx"); e != nil { - glog.V(0).Infoln("Failed to read file", fileName+".idx !") - return false - } - defer indexFile.Close() - glog.V(0).Infof("converting %s.idx to %s.cdb", fileName, fileName) - if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { - glog.V(0).Infof("error converting %s.idx to %s.cdb: %v", fileName, fileName, e) - return false - } - return true -} // volume is expired if modified time + volume ttl < now // except when volume is empty diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7e026a61d..9f6f8e35f 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error { } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - if e = v.load(true, false); e != nil { + if e = v.load(true, false, false); e != nil { return e } return nil @@ -73,7 +73,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if _, err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, err = n.Append(dst, v.Version()); err != nil { |
