aboutsummaryrefslogtreecommitdiff
path: root/go/storage/needle_map.go
blob: 41f53b90b411e4229709c58e7b85ae165b5203c5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package storage

import (
	"encoding/binary"
	"fmt"
	"io/ioutil"
	"os"
	"sync"
)

type NeedleMapType int

const (
	NeedleMapInMemory NeedleMapType = iota
	NeedleMapLevelDb
	NeedleMapBoltDb
)

type NeedleMapper interface {
	Put(key uint64, offset uint32, size uint32) error
	Get(key uint64) (element *NeedleValue, ok bool)
	Delete(key uint64) error
	Close()
	Destroy() error
	ContentSize() uint64
	DeletedSize() uint64
	FileCount() int
	DeletedCount() int
	MaxFileKey() uint64
	IndexFileSize() uint64
	IndexFileContent() ([]byte, error)
	IndexFileName() string
}

type baseNeedleMapper struct {
	indexFile           *os.File
	indexFileAccessLock sync.Mutex

	mapMetric
}

func (nm baseNeedleMapper) IndexFileSize() uint64 {
	stat, err := nm.indexFile.Stat()
	if err == nil {
		return uint64(stat.Size())
	}
	return 0
}

func (nm baseNeedleMapper) IndexFileName() string {
	return nm.indexFile.Name()
}

func idxFileEntry(bytes []byte) (key uint64, offset uint32, size uint32) {
	key = binary.BigEndian.Uint64(bytes[:8])
	offset = binary.BigEndian.Uint32(bytes[8:12])
	size = binary.BigEndian.Uint32(bytes[12:16])
	return
}
func (nm baseNeedleMapper) appendToIndexFile(key uint64, offset uint32, size uint32) error {
	bytes := make([]byte, 16)
	binary.BigEndian.PutUint64(bytes[0:8], key)
	binary.BigEndian.PutUint32(bytes[8:12], offset)
	binary.BigEndian.PutUint32(bytes[12:16], size)

	nm.indexFileAccessLock.Lock()
	defer nm.indexFileAccessLock.Unlock()
	if _, err := nm.indexFile.Seek(0, 2); err != nil {
		return fmt.Errorf("cannot seek end of indexfile %s: %v",
			nm.indexFile.Name(), err)
	}
	_, err := nm.indexFile.Write(bytes)
	return err
}
func (nm baseNeedleMapper) IndexFileContent() ([]byte, error) {
	nm.indexFileAccessLock.Lock()
	defer nm.indexFileAccessLock.Unlock()
	return ioutil.ReadFile(nm.indexFile.Name())
}

type mapMetric struct {
	indexFile *os.File

	DeletionCounter     int    `json:"DeletionCounter"`
	FileCounter         int    `json:"FileCounter"`
	DeletionByteCounter uint64 `json:"DeletionByteCounter"`
	FileByteCounter     uint64 `json:"FileByteCounter"`
	MaximumFileKey      uint64 `json:"MaxFileKey"`
}

func (mm *mapMetric) logDelete(deletedByteCount uint32) {
	mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
	mm.DeletionCounter++
}

func (mm *mapMetric) logPut(key uint64, oldSize uint32, newSize uint32) {
	if key > mm.MaximumFileKey {
		mm.MaximumFileKey = key
	}
	mm.FileCounter++
	mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
	if oldSize > 0 {
		mm.DeletionCounter++
		mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
	}
}

func (mm mapMetric) ContentSize() uint64 {
	return mm.FileByteCounter
}
func (mm mapMetric) DeletedSize() uint64 {
	return mm.DeletionByteCounter
}
func (mm mapMetric) FileCount() int {
	return mm.FileCounter
}
func (mm mapMetric) DeletedCount() int {
	return mm.DeletionCounter
}
func (mm mapMetric) MaxFileKey() uint64 {
	return mm.MaximumFileKey
}