aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-12-18 01:21:21 -0800
committerChris Lu <chris.lu@gmail.com>2019-12-18 01:21:21 -0800
commit58f88e530cf446976d88884227f7904b166a3953 (patch)
treea6c32f00e38399c7f86f17203424b88d9e7c21eb
parent2f21beaccd93813ce1b7e611020cc51333bb62f3 (diff)
downloadseaweedfs-58f88e530cf446976d88884227f7904b166a3953.tar.xz
seaweedfs-58f88e530cf446976d88884227f7904b166a3953.zip
volume: use sorted index map for readonly volumes
-rw-r--r--weed/server/volume_grpc_erasure_coding.go4
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go8
-rw-r--r--weed/storage/erasure_coding/ec_test.go6
-rw-r--r--weed/storage/erasure_coding/ec_volume.go4
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go10
-rw-r--r--weed/storage/needle_map_sorted_file.go105
-rw-r--r--weed/storage/volume_loading.go81
7 files changed, 165 insertions, 53 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 242480197..6ded84cc3 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -47,8 +47,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
}
// write .ec01 ~ .ec14 files
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 97010a1ed..75369dc8d 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -21,16 +21,16 @@ const (
ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
)
-// WriteSortedEcxFile generates .ecx file from existing .idx file
+// WriteSortedFileFromIdx generates .ecx file from existing .idx file
// all keys are sorted in ascending order
-func WriteSortedEcxFile(baseFileName string) (e error) {
+func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
cm, err := readCompactMap(baseFileName)
if err != nil {
return fmt.Errorf("readCompactMap: %v", err)
}
- ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
+ ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open ecx file: %v", err)
}
@@ -43,7 +43,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) {
})
if err != nil {
- return fmt.Errorf("failed to visit ecx file: %v", err)
+ return fmt.Errorf("failed to visit idx file: %v", err)
}
return nil
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 57df09525..0e4aaa27c 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -26,14 +26,14 @@ func TestEncodingDecoding(t *testing.T) {
t.Logf("generateEcFiles: %v", err)
}
- err = WriteSortedEcxFile(baseFileName)
+ err = WriteSortedFileFromIdx(baseFileName, ".ecx")
if err != nil {
- t.Logf("WriteSortedEcxFile: %v", err)
+ t.Logf("WriteSortedFileFromIdx: %v", err)
}
err = validateFiles(baseFileName)
if err != nil {
- t.Logf("WriteSortedEcxFile: %v", err)
+ t.Logf("WriteSortedFileFromIdx: %v", err)
}
removeGeneratedFiles(baseFileName)
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index bcae164ca..e8ebe6204 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -186,10 +186,10 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
}
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
- return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
+ return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 04102ec9e..822a9e923 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -10,15 +10,15 @@ import (
)
var (
- markNeedleDeleted = func(file *os.File, offset int64) error {
+ MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
util.Uint32toBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
- return fmt.Errorf("ecx write error: %v", err)
+ return fmt.Errorf("sorted needle write error: %v", err)
}
if n != types.SizeSize {
- return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize)
+ return fmt.Errorf("sorted needle written %d bytes, expecting %d", n, types.SizeSize)
}
return nil
}
@@ -26,7 +26,7 @@ var (
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
- _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted)
+ _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil {
if err == NotFoundError {
@@ -81,7 +81,7 @@ func RebuildEcxFile(baseFileName string) error {
needleId := types.BytesToNeedleId(buf)
- _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted)
+ _, _, err = SearchNeedleFromSortedIndex(ecxFile, ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil && err != NotFoundError {
ecxFile.Close()
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
new file mode 100644
index 000000000..6b3d9c449
--- /dev/null
+++ b/weed/storage/needle_map_sorted_file.go
@@ -0,0 +1,105 @@
+package storage
+
+import (
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type SortedFileNeedleMap struct {
+ baseNeedleMapper
+ baseFileName string
+ dbFile *os.File
+ dbFileSize int64
+}
+
+func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
+ m = &SortedFileNeedleMap{baseFileName: baseFileName}
+ m.indexFile = indexFile
+ fileName := baseFileName+".sdb"
+ if !isSortedFileFresh(fileName, indexFile) {
+ glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
+ erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdb")
+ glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", fileName)
+
+ if m.dbFile, err = os.Open(baseFileName + ".sdb"); err != nil {
+ return
+ }
+ dbStat, _ := m.dbFile.Stat()
+ m.dbFileSize = dbStat.Size()
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = *mm
+ return
+}
+
+func isSortedFileFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbFile, err := os.Open(dbFileName)
+ if err != nil {
+ return false
+ }
+ defer dbFile.Close()
+ dbStat, dbStatErr := dbFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
+ offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
+ ok = err == nil
+ return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, ok
+
+}
+
+func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
+ return os.ErrInvalid
+}
+
+func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
+
+ _, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
+
+ if err != nil {
+ if err == erasure_coding.NotFoundError {
+ return nil
+ }
+ return err
+ }
+
+ if size == TombstoneFileSize {
+ return nil
+ }
+
+ // write to index file first
+ if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
+ return err
+ }
+ _, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
+
+ return err
+}
+
+func (m *SortedFileNeedleMap) Close() {
+ m.indexFile.Close()
+ m.dbFile.Close()
+}
+
+func (m *SortedFileNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.baseFileName + ".sdb")
+}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index e09f848ab..9ec6b14be 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -5,12 +5,12 @@ import (
"os"
"time"
- "github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
@@ -88,41 +88,48 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.readOnly = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
}
- switch needleMapKind {
- case NeedleMapInMemory:
- glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
- if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
- glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
- }
- case NeedleMapLevelDb:
- glog.V(0).Infoln("loading leveldb", fileName+".ldb")
- opts := &opt.Options{
- BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10, // default value is 1
- }
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
- }
- case NeedleMapLevelDbMedium:
- glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
- opts := &opt.Options{
- BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10, // default value is 1
- }
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
- }
- case NeedleMapLevelDbLarge:
- glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
- opts := &opt.Options{
- BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10, // default value is 1
+
+ if v.readOnly {
+ if v.nm, e = NewSortedFileNeedleMap(fileName, indexFile); e != nil {
+ glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdb", e)
}
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ } else {
+ switch needleMapKind {
+ case NeedleMapInMemory:
+ glog.V(0).Infoln("loading index", fileName+".idx", "to memory")
+ if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
+ glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
+ }
+ case NeedleMapLevelDb:
+ glog.V(0).Infoln("loading leveldb", fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
+ case NeedleMapLevelDbMedium:
+ glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
+ case NeedleMapLevelDbLarge:
+ glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ }
}
}
}