aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed-fs/src/pkg/storage/cdb_map.go107
-rw-r--r--weed-fs/src/pkg/storage/compact_map.go4
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go112
-rw-r--r--weed-fs/src/pkg/storage/volume.go4
4 files changed, 195 insertions, 32 deletions
diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go
new file mode 100644
index 000000000..b80b3cde9
--- /dev/null
+++ b/weed-fs/src/pkg/storage/cdb_map.go
@@ -0,0 +1,107 @@
+package storage
+
+import (
+ "github.com/tgulacsi/go-cdb"
+ "io"
+ "log"
+ "os"
+ "pkg/util"
+ "strings"
+)
+
+type CdbMap struct {
+ db *cdb.Cdb
+ transient []byte
+ Filename string
+}
+
+// Opens the CDB file and servers as a needle map
+func NewCdbMap(filename string) (*CdbMap, error) {
+ m, err := cdb.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ return &CdbMap{db: m, transient: make([]byte, 8),
+ Filename: filename}, nil
+}
+
+// writes the content of the index file to a CDB and returns that
+func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
+ nm := indexFile.Name()
+ nm = nm[strings.LastIndex(nm, ".")+1:] + "cdb"
+
+ var (
+ key uint64
+ offset uint32
+ ok bool
+ )
+ deleted := make(map[uint64]bool, 16)
+ gatherDeletes := func(buf []byte) error {
+ key = util.BytesToUint64(buf[:8])
+ offset = util.BytesToUint32(buf[8:12])
+ if offset > 0 {
+ if _, ok = deleted[key]; ok { //undelete
+ delete(deleted, key)
+ }
+ } else {
+ deleted[key] = true
+ }
+ return nil
+ }
+ if err := readIndexFile(indexFile, gatherDeletes); err != nil {
+ return nil, err
+ }
+
+ w, err := cdb.NewWriter(nm)
+ if err != nil {
+ return nil, err
+ }
+ iterFun := func(buf []byte) error {
+ key = util.BytesToUint64(buf[:8])
+ if _, ok = deleted[key]; !ok {
+ w.PutPair(buf[:8], buf[8:16])
+ }
+ return nil
+ }
+ indexFile.Seek(0, 0)
+ err = readIndexFile(indexFile, iterFun)
+ w.Close()
+ if err != nil {
+ return nil, err
+ }
+
+ return NewCdbMap(nm)
+}
+
+func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) {
+ util.Uint64toBytes(m.transient, uint64(key))
+ data, err := m.db.Data(m.transient)
+ if err != nil {
+ if err == io.EOF {
+ return nil, false
+ }
+ log.Printf("error getting %s: %s", key, err)
+ return nil, false
+ }
+ return &NeedleValue{Key: key,
+ Offset: util.BytesToUint32(data[:4]),
+ Size: util.BytesToUint32(data[4:8]),
+ }, true
+}
+
+func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
+ r, err := os.Open(m.Filename)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+
+ iterFunc := func(elt cdb.Element) error {
+ return pedestrian(&NeedleValue{
+ Key: Key(util.BytesToUint64(elt.Key[:8])),
+ Offset: util.BytesToUint32(elt.Data[:4]),
+ Size: util.BytesToUint32(elt.Data[4:8]),
+ })
+ }
+ return cdb.DumpMap(r, iterFunc)
+}
diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go
index 90ed42198..61cc2c841 100644
--- a/weed-fs/src/pkg/storage/compact_map.go
+++ b/weed-fs/src/pkg/storage/compact_map.go
@@ -109,8 +109,8 @@ type CompactMap struct {
list []CompactSection
}
-func NewCompactMap() CompactMap {
- return CompactMap{}
+func NewCompactMap() *CompactMap {
+ return &CompactMap{}
}
func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
index 9ce38c1f2..9d7369509 100644
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ b/weed-fs/src/pkg/storage/needle_map.go
@@ -1,6 +1,7 @@
package storage
import (
+ "io"
"log"
"os"
"pkg/util"
@@ -8,7 +9,8 @@ import (
type NeedleMap struct {
indexFile *os.File
- m CompactMap
+ m MapGetSetter // modifiable map
+ fm MapGetter // frozen map
//transient
bytes []byte
@@ -19,52 +21,106 @@ type NeedleMap struct {
fileByteCounter uint64
}
+// Map interface for frozen maps
+type MapGetter interface {
+ Get(key Key) (element *NeedleValue, ok bool)
+ Walk(pedestrian func(*NeedleValue) error) error
+}
+
+// Modifiable map interface
+type MapSetter interface {
+ Set(key Key, offset, size uint32) (oldsize uint32)
+ Delete(key Key) uint32
+}
+
+// Settable and gettable map
+type MapGetSetter interface {
+ MapGetter
+ MapSetter
+}
+
+// New in-memory needle map, backed by "file" index file
func NewNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
+ return &NeedleMap{
m: NewCompactMap(),
bytes: make([]byte, 16),
indexFile: file,
}
- return nm
+}
+
+// Nes frozen (on-disk, not modifiable(!)) needle map
+func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
+ fm, err := NewCdbMapFromIndex(file)
+ if err != nil {
+ return nil, err
+ }
+ return &NeedleMap{
+ fm: fm,
+ bytes: make([]byte, 16),
+ }, nil
}
const (
RowsToRead = 1024
)
-func LoadNeedleMap(file *os.File) *NeedleMap {
+func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
- bytes := make([]byte, 16*RowsToRead)
- count, e := nm.indexFile.Read(bytes)
- if count > 0 {
- fstat, _ := file.Stat()
- log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
+
+ var (
+ key uint64
+ offset, size, oldSize uint32
+ )
+ iterFun := func(buf []byte) error {
+ key = util.BytesToUint64(buf[:8])
+ offset = util.BytesToUint32(buf[8:12])
+ size = util.BytesToUint32(buf[12:16])
+ nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize = nm.m.Set(Key(key), offset, size)
+ //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ nm.m.Delete(Key(key))
+ //log.Println("removing key", key)
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
+ }
+
+ return nil
+ }
+ if err := readIndexFile(file, iterFun); err != nil {
+ return nil, err
+ }
+ return nm, nil
+}
+
+// calls iterFun with each row (raw 16 bytes)
+func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
+ buf := make([]byte, 16*RowsToRead)
+ count, e := io.ReadAtLeast(indexFile, buf, 16)
+ if e != nil && count > 0 {
+ fstat, err := indexFile.Stat()
+ if err != nil {
+ log.Println("ERROR stating %s: %s", indexFile, err)
+ } else {
+ log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
+ }
}
for count > 0 && e == nil {
for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- nm.fileCounter++
- nm.fileByteCounter = nm.fileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
- }
- } else {
- nm.m.Delete(Key(key))
- //log.Println("removing key", key)
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
+ if e = iterFun(buf[i : i+16]); e != nil {
+ return e
}
}
- count, e = nm.indexFile.Read(bytes)
+ count, e = io.ReadAtLeast(indexFile, buf, 16)
}
- return nm
+ return nil
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 5e64d0763..71dfb5aee 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -48,8 +48,8 @@ func (v *Volume) load() error {
if ie != nil {
return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
}
- v.nm = LoadNeedleMap(indexFile)
- return nil
+ v.nm, e = LoadNeedleMap(indexFile)
+ return e
}
func (v *Volume) Version() Version {
return v.version