aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/needle_map_sorted_file.go
blob: 5bd67ea86586bf16a694dd3457462f9cc1f08ddb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package storage

import (
	"os"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
	"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
	. "github.com/seaweedfs/seaweedfs/weed/storage/types"
)

type SortedFileNeedleMap struct {
	baseNeedleMapper
	baseFileName string
	dbFile       *os.File
	dbFileSize   int64
}

func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
	m = &SortedFileNeedleMap{baseFileName: indexBaseFileName}
	m.indexFile = indexFile
	fileName := indexBaseFileName + ".sdx"
	if !isSortedFileFresh(fileName, indexFile) {
		glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
		erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx")
		glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
	}
	glog.V(1).Infof("Opening %s...", fileName)

	if m.dbFile, err = os.OpenFile(indexBaseFileName+".sdx", os.O_RDWR, 0); err != nil {
		return
	}
	dbStat, _ := m.dbFile.Stat()
	m.dbFileSize = dbStat.Size()
	glog.V(1).Infof("Loading %s...", indexFile.Name())
	mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
	if indexLoadError != nil {
		_ = m.dbFile.Close()
		return nil, indexLoadError
	}
	m.mapMetric = *mm
	return
}

func isSortedFileFresh(dbFileName string, indexFile *os.File) bool {
	// normally we always write to index file first
	dbFile, err := os.Open(dbFileName)
	if err != nil {
		return false
	}
	defer dbFile.Close()
	dbStat, dbStatErr := dbFile.Stat()
	indexStat, indexStatErr := indexFile.Stat()
	if dbStatErr != nil || indexStatErr != nil {
		glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
		return false
	}

	return dbStat.ModTime().After(indexStat.ModTime())
}

func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
	offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
	ok = err == nil
	return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, ok

}

func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
	return os.ErrInvalid
}

func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {

	_, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)

	if err != nil {
		if err == erasure_coding.NotFoundError {
			return nil
		}
		return err
	}

	if size.IsDeleted() {
		return nil
	}

	// write to index file first
	if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
		return err
	}
	_, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)

	return err
}

func (m *SortedFileNeedleMap) Close() {
	if m == nil {
		return
	}
	if m.indexFile != nil {
		m.indexFile.Close()
	}
	if m.dbFile != nil {
		m.dbFile.Close()
	}
}

func (m *SortedFileNeedleMap) Destroy() error {
	m.Close()
	os.Remove(m.indexFile.Name())
	return os.Remove(m.baseFileName + ".sdx")
}