diff options
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/needle_map.go | 8 | ||||
| -rw-r--r-- | go/storage/needle_map_boltdb.go | 182 | ||||
| -rw-r--r-- | go/storage/needle_map_leveldb.go | 5 | ||||
| -rw-r--r-- | go/storage/store.go | 18 | ||||
| -rw-r--r-- | go/storage/volume.go | 38 | ||||
| -rw-r--r-- | go/storage/volume_vacuum.go | 41 |
6 files changed, 246 insertions, 46 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index 0bfa12180..c05aae745 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -7,6 +7,14 @@ import ( "github.com/chrislusf/weed-fs/go/util" ) +type NeedleMapType int + +const ( + NeedleMapInMemory NeedleMapType = iota + NeedleMapLevelDb + NeedleMapBoltDb +) + type NeedleMapper interface { Put(key uint64, offset uint32, size uint32) error Get(key uint64) (element *NeedleValue, ok bool) diff --git a/go/storage/needle_map_boltdb.go b/go/storage/needle_map_boltdb.go new file mode 100644 index 000000000..bef10299a --- /dev/null +++ b/go/storage/needle_map_boltdb.go @@ -0,0 +1,182 @@ +package storage + +import ( + "fmt" + "os" + + "github.com/boltdb/bolt" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/util" +) + +type BoltDbNeedleMap struct { + dbFileName string + indexFile *os.File + db *bolt.DB + mapMetric +} + +var boltdbBucket = []byte("weed") + +func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { + m = &BoltDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} + if !isBoltDbFresh(dbFileName, indexFile) { + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + generateBoltDbFile(dbFileName, indexFile) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", dbFileName) + if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil { + return + } + glog.V(1).Infof("Loading %s...", indexFile.Name()) + nm, indexLoadError := LoadNeedleMap(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = nm.mapMetric + return +} + +func isBoltDbFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbLogFile, err := os.Open(dbFileName) + if err != nil { + return false + } + defer dbLogFile.Close() + dbStat, dbStatErr := dbLogFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func generateBoltDbFile(dbFileName string, indexFile *os.File) error { + db, err := bolt.Open(dbFileName, 0644, nil) + if err != nil { + return err + } + defer db.Close() + return WalkIndexFile(indexFile, func(key uint64, offset, size uint32) error { + if offset > 0 { + boltDbWrite(db, key, offset, size) + } else { + boltDbDelete(db, key) + } + return nil + }) +} + +func (m *BoltDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + bytes := make([]byte, 8) + var data []byte + util.Uint64toBytes(bytes, key) + err := m.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(boltdbBucket) + if bucket == nil { + return fmt.Errorf("Bucket %q not found!", boltdbBucket) + } + + data = bucket.Get(bytes) + return nil + }) + + if err != nil || len(data) != 8 { + glog.V(0).Infof("Failed to get %d %v", key, err) + return nil, false + } + offset := util.BytesToUint32(data[0:4]) + size := util.BytesToUint32(data[4:8]) + return &NeedleValue{Key: Key(key), Offset: offset, Size: size}, true +} + +func (m *BoltDbNeedleMap) Put(key uint64, offset uint32, size uint32) error { + var oldSize uint32 + if oldNeedle, ok := m.Get(key); ok { + oldSize = oldNeedle.Size + } + m.logPut(key, oldSize, size) + // write to index file first + if err := appendToIndexFile(m.indexFile, key, offset, size); err != nil { + return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) + } + return boltDbWrite(m.db, key, offset, size) +} + +func boltDbWrite(db *bolt.DB, + key uint64, offset uint32, size uint32) error { + bytes := make([]byte, 16) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], offset) + util.Uint32toBytes(bytes[12:16], size) + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) + if err != nil { + return err + } + + err = bucket.Put(bytes[0:8], bytes[8:16]) + if err != nil { + return err + } + return nil + }) +} +func boltDbDelete(db *bolt.DB, key uint64) error { + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, key) + return db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) + if err != nil { + return err + } + + err = bucket.Delete(bytes) + if err != nil { + return err + } + return nil + }) +} + +func (m *BoltDbNeedleMap) Delete(key uint64) error { + if oldNeedle, ok := m.Get(key); ok { + m.logDelete(oldNeedle.Size) + } + // write to index file first + if err := appendToIndexFile(m.indexFile, key, 0, 0); err != nil { + return err + } + return boltDbDelete(m.db, key) +} + +func (m *BoltDbNeedleMap) Close() { + m.db.Close() +} + +func (m *BoltDbNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.dbFileName) +} + +func (m *BoltDbNeedleMap) ContentSize() uint64 { + return m.FileByteCounter +} +func (m *BoltDbNeedleMap) DeletedSize() uint64 { + return m.DeletionByteCounter +} +func (m *BoltDbNeedleMap) FileCount() int { + return m.FileCounter +} +func (m *BoltDbNeedleMap) DeletedCount() int { + return m.DeletionCounter +} +func (m *BoltDbNeedleMap) MaxFileKey() uint64 { + return m.MaximumFileKey +} diff --git a/go/storage/needle_map_leveldb.go b/go/storage/needle_map_leveldb.go index 73595278d..32f763b4f 100644 --- a/go/storage/needle_map_leveldb.go +++ b/go/storage/needle_map_leveldb.go @@ -21,7 +21,7 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl m = &LevelDbNeedleMap{indexFile: indexFile, dbFileName: dbFileName} if !isLevelDbFresh(dbFileName, indexFile) { glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) - generateDbFile(dbFileName, indexFile) + generateLevelDbFile(dbFileName, indexFile) glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) } glog.V(1).Infof("Opening %s...", dbFileName) @@ -54,7 +54,7 @@ func isLevelDbFresh(dbFileName string, indexFile *os.File) bool { return dbStat.ModTime().After(indexStat.ModTime()) } -func generateDbFile(dbFileName string, indexFile *os.File) error { +func generateLevelDbFile(dbFileName string, indexFile *os.File) error { db, err := leveldb.OpenFile(dbFileName, nil) if err != nil { return err @@ -75,7 +75,6 @@ func (m *LevelDbNeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { util.Uint64toBytes(bytes, key) data, err := m.db.Get(bytes, nil) if err != nil || len(data) != 8 { - glog.V(0).Infof("Failed to get %d %v", key, err) return nil, false } offset := util.BytesToUint32(data[0:4]) diff --git a/go/storage/store.go b/go/storage/store.go index 65e5b218b..eb44bd9d0 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -90,18 +90,18 @@ func (s *Store) String() (str string) { return } -func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, useLevelDb bool) (s *Store) { +func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { location := &DiskLocation{Directory: dirnames[i], MaxVolumeCount: maxVolumeCounts[i]} location.volumes = make(map[VolumeId]*Volume) - location.loadExistingVolumes(useLevelDb) + location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) } return } -func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb bool, replicaPlacement string, ttlString string) error { +func (s *Store) AddVolume(volumeListString string, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -117,7 +117,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl) + e = s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -129,7 +129,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, useLevelDb return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, useLevelDb, rt, ttl); err != nil { + if err := s.addVolume(VolumeId(id), collection, needleMapKind, rt, ttl); err != nil { e = err } } @@ -178,14 +178,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) error { +func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, useLevelDb, replicaPlacement, ttl); err == nil { + if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -195,7 +195,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, useLevelDb bool, repl return fmt.Errorf("No more free space left") } -func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { if dirs, err := ioutil.ReadDir(l.Directory); err == nil { for _, dir := range dirs { name := dir.Name() @@ -208,7 +208,7 @@ func (l *DiskLocation) loadExistingVolumes(useLevelDb bool) { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, useLevelDb, nil, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil); e == nil { l.volumes[vid] = v glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } diff --git a/go/storage/volume.go b/go/storage/volume.go index 2b47fb497..e35eeee49 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -14,12 +14,13 @@ import ( ) type Volume struct { - Id VolumeId - dir string - Collection string - dataFile *os.File - nm NeedleMapper - readOnly bool + Id VolumeId + dir string + Collection string + dataFile *os.File + nm NeedleMapper + needleMapKind NeedleMapType + readOnly bool SuperBlock @@ -27,20 +28,22 @@ type Volume struct { lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, useLevelDb bool, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} - e = v.load(true, true, useLevelDb) + v.needleMapKind = needleMapKind + e = v.load(true, true, needleMapKind) return } func (v *Volume) String() string { return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.dataFile, v.nm, v.readOnly) } -func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) { +func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = SuperBlock{} - e = v.load(false, false, false) + v.needleMapKind = needleMapKind + e = v.load(false, false, needleMapKind) return } func (v *Volume) FileName() (fileName string) { @@ -51,7 +54,7 @@ func (v *Volume) FileName() (fileName string) { } return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bool) error { +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType) error { var e error fileName := v.FileName() @@ -99,16 +102,22 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, useLevelDb bo return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) } } - if !useLevelDb { + switch needleMapKind { + case NeedleMapInMemory: glog.V(0).Infoln("loading index file", fileName+".idx", "readonly", v.readOnly) if v.nm, e = LoadNeedleMap(indexFile); e != nil { glog.V(0).Infof("loading index %s error: %v", fileName+".idx", e) } - } else { + case NeedleMapLevelDb: glog.V(0).Infoln("loading leveldb file", fileName+".ldb") if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) } + case NeedleMapBoltDb: + glog.V(0).Infoln("loading boltdb file", fileName+".bdb") + if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { + glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) + } } } return e @@ -263,11 +272,12 @@ func (v *Volume) read(n *Needle) (int, error) { } func ScanVolumeFile(dirname string, collection string, id VolumeId, + needleMapKind NeedleMapType, visitSuperBlock func(SuperBlock) error, readNeedleBody bool, visitNeedle func(n *Needle, offset int64) error) (err error) { var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id); err != nil { + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { return fmt.Errorf("Failed to load volume %d: %v", id, err) } if err = visitSuperBlock(v.SuperBlock); err != nil { diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 9f6f8e35f..eab138000 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -38,7 +38,7 @@ func (v *Volume) commitCompact() error { } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - if e = v.load(true, false, false); e != nil { + if e = v.load(true, false, v.needleMapKind); e != nil { return e } return nil @@ -63,27 +63,28 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro now := uint64(time.Now().Unix()) - err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { - _, err = dst.Write(superBlock.Bytes()) - return err - }, true, func(n *Needle, offset int64) error { - if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { - return nil - } - nv, ok := v.nm.Get(n.Id) - glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { - if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) + err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, + func(superBlock SuperBlock) error { + _, err = dst.Write(superBlock.Bytes()) + return err + }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil } - if _, err = n.Append(dst, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) + nv, ok := v.nm.Get(n.Id) + glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) + if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { + if err = nm.Put(n.Id, uint32(new_offset/NeedlePaddingSize), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, err = n.Append(dst, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + new_offset += n.DiskSize() + glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) } - new_offset += n.DiskSize() - glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", new_offset, "data_size", n.Size) - } - return nil - }) + return nil + }) return } |
