aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/storage/needle/compact_map.go159
-rw-r--r--weed/storage/needle/compact_map_test.go48
-rw-r--r--weed/storage/types/needle_types.go36
3 files changed, 139 insertions, 104 deletions
diff --git a/weed/storage/needle/compact_map.go b/weed/storage/needle/compact_map.go
index bca698407..cb0bf2e51 100644
--- a/weed/storage/needle/compact_map.go
+++ b/weed/storage/needle/compact_map.go
@@ -15,27 +15,36 @@ type SectionalNeedleId uint32
const SectionalNeedleIdLimit = 1<<32 - 1
type SectionalNeedleValue struct {
- Key SectionalNeedleId
- Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 `comment:"Size of the data portion"`
+ Key SectionalNeedleId
+ OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
+ Size uint32 `comment:"Size of the data portion"`
+}
+
+type SectionalNeedleValueExtra struct {
+ OffsetHigher OffsetHigher
}
type CompactSection struct {
sync.RWMutex
- values []SectionalNeedleValue
- overflow Overflow
- start NeedleId
- end NeedleId
- counter int
+ values []SectionalNeedleValue
+ valuesExtra []SectionalNeedleValueExtra
+ overflow Overflow
+ overflowExtra OverflowExtra
+ start NeedleId
+ end NeedleId
+ counter int
}
type Overflow []SectionalNeedleValue
+type OverflowExtra []SectionalNeedleValueExtra
func NewCompactSection(start NeedleId) *CompactSection {
return &CompactSection{
- values: make([]SectionalNeedleValue, batch),
- overflow: Overflow(make([]SectionalNeedleValue, 0)),
- start: start,
+ values: make([]SectionalNeedleValue, batch),
+ valuesExtra: make([]SectionalNeedleValueExtra, batch),
+ overflow: Overflow(make([]SectionalNeedleValue, 0)),
+ overflowExtra: OverflowExtra(make([]SectionalNeedleValueExtra, 0)),
+ start: start,
}
}
@@ -47,21 +56,21 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs
}
skey := SectionalNeedleId(key - cs.start)
if i := cs.binarySearchValues(skey); i >= 0 {
- oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size
+ oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size
//println("key", key, "old size", ret)
- cs.values[i].Offset, cs.values[i].Size = offset, size
+ cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size
} else {
needOverflow := cs.counter >= batch
needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
if needOverflow {
//println("start", cs.start, "counter", cs.counter, "key", key)
- if oldValue, found := cs.overflow.findOverflowEntry(skey); found {
- oldOffset, oldSize = oldValue.Offset, oldValue.Size
+ if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found {
+ oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size
}
- cs.overflow = cs.overflow.setOverflowEntry(SectionalNeedleValue{Key: skey, Offset: offset, Size: size})
+ cs.setOverflowEntry(skey, offset, size)
} else {
p := &cs.values[cs.counter]
- p.Key, p.Offset, p.Size = skey, offset, size
+ p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
//println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
cs.counter++
}
@@ -70,6 +79,50 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs
return
}
+func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size uint32) {
+ needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size}
+ needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: OffsetHigher{}}
+ insertCandidate := sort.Search(len(cs.overflow), func(i int) bool {
+ return cs.overflow[i].Key >= needleValue.Key
+ })
+ if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key {
+ cs.overflow[insertCandidate] = needleValue
+ } else {
+ cs.overflow = append(cs.overflow, needleValue)
+ cs.overflowExtra = append(cs.overflowExtra, needleValueExtra)
+ for i := len(cs.overflow) - 1; i > insertCandidate; i-- {
+ cs.overflow[i] = cs.overflow[i-1]
+ cs.overflowExtra[i] = cs.overflowExtra[i-1]
+ }
+ cs.overflow[insertCandidate] = needleValue
+ }
+}
+
+func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nve SectionalNeedleValueExtra, nv SectionalNeedleValue, found bool) {
+ foundCandidate := sort.Search(len(cs.overflow), func(i int) bool {
+ return cs.overflow[i].Key >= key
+ })
+ if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key {
+ return cs.overflowExtra[foundCandidate], cs.overflow[foundCandidate], true
+ }
+ return nve, nv, false
+}
+
+func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
+ length := len(cs.overflow)
+ deleteCandidate := sort.Search(length, func(i int) bool {
+ return cs.overflow[i].Key >= key
+ })
+ if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
+ for i := deleteCandidate; i < length-1; i++ {
+ cs.overflow[i] = cs.overflow[i+1]
+ cs.overflowExtra[i] = cs.overflowExtra[i+1]
+ }
+ cs.overflow = cs.overflow[0 : length-1]
+ cs.overflowExtra = cs.overflowExtra[0 : length-1]
+ }
+}
+
//return old entry size
func (cs *CompactSection) Delete(key NeedleId) uint32 {
skey := SectionalNeedleId(key - cs.start)
@@ -81,8 +134,8 @@ func (cs *CompactSection) Delete(key NeedleId) uint32 {
cs.values[i].Size = TombstoneFileSize
}
}
- if v, found := cs.overflow.findOverflowEntry(skey); found {
- cs.overflow = cs.overflow.deleteOverflowEntry(skey)
+ if _, v, found := cs.findOverflowEntry(skey); found {
+ cs.deleteOverflowEntry(skey)
ret = v.Size
}
cs.Unlock()
@@ -91,14 +144,14 @@ func (cs *CompactSection) Delete(key NeedleId) uint32 {
func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) {
cs.RLock()
skey := SectionalNeedleId(key - cs.start)
- if v, ok := cs.overflow.findOverflowEntry(skey); ok {
+ if ve, v, ok := cs.findOverflowEntry(skey); ok {
cs.RUnlock()
- nv := v.toNeedleValue(cs)
+ nv := toNeedleValue(ve, v, cs)
return &nv, true
}
if i := cs.binarySearchValues(skey); i >= 0 {
cs.RUnlock()
- nv := cs.values[i].toNeedleValue(cs)
+ nv := toNeedleValue(cs.valuesExtra[i], cs.values[i], cs)
return &nv, true
}
cs.RUnlock()
@@ -194,8 +247,8 @@ func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
for _, cs := range cm.list {
cs.RLock()
- for _, v := range cs.overflow {
- if err := visit(v.toNeedleValue(cs)); err != nil {
+ for i, v := range cs.overflow {
+ if err := visit(toNeedleValue(cs.overflowExtra[i], v, cs)); err != nil {
cs.RUnlock()
return err
}
@@ -204,8 +257,8 @@ func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
if i >= cs.counter {
break
}
- if _, found := cs.overflow.findOverflowEntry(v.Key); !found {
- if err := visit(v.toNeedleValue(cs)); err != nil {
+ if _, _, found := cs.findOverflowEntry(v.Key); !found {
+ if err := visit(toNeedleValue(cs.valuesExtra[i], v, cs)); err != nil {
cs.RUnlock()
return err
}
@@ -216,50 +269,20 @@ func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
return nil
}
-func (o Overflow) deleteOverflowEntry(key SectionalNeedleId) Overflow {
- length := len(o)
- deleteCandidate := sort.Search(length, func(i int) bool {
- return o[i].Key >= key
- })
- if deleteCandidate != length && o[deleteCandidate].Key == key {
- for i := deleteCandidate; i < length-1; i++ {
- o[i] = o[i+1]
- }
- o = o[0 : length-1]
+func toNeedleValue(snve SectionalNeedleValueExtra, snv SectionalNeedleValue, cs *CompactSection) NeedleValue {
+ offset := Offset{
+ OffsetHigher: snve.OffsetHigher,
+ OffsetLower: snv.OffsetLower,
}
- return o
+ return NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size}
}
-func (o Overflow) setOverflowEntry(needleValue SectionalNeedleValue) Overflow {
- insertCandidate := sort.Search(len(o), func(i int) bool {
- return o[i].Key >= needleValue.Key
- })
- if insertCandidate != len(o) && o[insertCandidate].Key == needleValue.Key {
- o[insertCandidate] = needleValue
- } else {
- o = append(o, needleValue)
- for i := len(o) - 1; i > insertCandidate; i-- {
- o[i] = o[i-1]
- }
- o[insertCandidate] = needleValue
+func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) (SectionalNeedleValue, SectionalNeedleValueExtra) {
+ return SectionalNeedleValue{
+ SectionalNeedleId(nv.Key - cs.start),
+ nv.Offset.OffsetLower,
+ nv.Size,
+ }, SectionalNeedleValueExtra{
+ nv.Offset.OffsetHigher,
}
- return o
-}
-
-func (o Overflow) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) {
- foundCandidate := sort.Search(len(o), func(i int) bool {
- return o[i].Key >= key
- })
- if foundCandidate != len(o) && o[foundCandidate].Key == key {
- return o[foundCandidate], true
- }
- return nv, false
-}
-
-func (snv SectionalNeedleValue) toNeedleValue(cs *CompactSection) NeedleValue {
- return NeedleValue{NeedleId(snv.Key) + cs.start, snv.Offset, snv.Size}
-}
-
-func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue {
- return SectionalNeedleValue{SectionalNeedleId(nv.Key - cs.start), nv.Offset, nv.Size}
}
diff --git a/weed/storage/needle/compact_map_test.go b/weed/storage/needle/compact_map_test.go
index 374b9ff4d..b9586ab54 100644
--- a/weed/storage/needle/compact_map_test.go
+++ b/weed/storage/needle/compact_map_test.go
@@ -98,60 +98,60 @@ func TestCompactMap(t *testing.T) {
}
func TestOverflow(t *testing.T) {
- o := Overflow(make([]SectionalNeedleValue, 0))
+ cs := NewCompactSection(1)
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 1, Offset: ToOffset(12), Size: 12})
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 2, Offset: ToOffset(12), Size: 12})
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 3, Offset: ToOffset(12), Size: 12})
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 4, Offset: ToOffset(12), Size: 12})
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 5, Offset: ToOffset(12), Size: 12})
+ cs.setOverflowEntry(1, ToOffset(12), 12)
+ cs.setOverflowEntry(2, ToOffset(12), 12)
+ cs.setOverflowEntry(3, ToOffset(12), 12)
+ cs.setOverflowEntry(4, ToOffset(12), 12)
+ cs.setOverflowEntry(5, ToOffset(12), 12)
- if o[2].Key != 3 {
- t.Fatalf("expecting o[2] has key 3: %+v", o[2].Key)
+ if cs.overflow[2].Key != 3 {
+ t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key)
}
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 3, Offset: ToOffset(24), Size: 24})
+ cs.setOverflowEntry(3, ToOffset(24), 24)
- if o[2].Key != 3 {
- t.Fatalf("expecting o[2] has key 3: %+v", o[2].Key)
+ if cs.overflow[2].Key != 3 {
+ t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key)
}
- if o[2].Size != 24 {
- t.Fatalf("expecting o[2] has size 24: %+v", o[2].Size)
+ if cs.overflow[2].Size != 24 {
+ t.Fatalf("expecting o[2] has size 24: %+v", cs.overflow[2].Size)
}
- o = o.deleteOverflowEntry(4)
+ cs.deleteOverflowEntry(4)
- if len(o) != 4 {
- t.Fatalf("expecting 4 entries now: %+v", o)
+ if len(cs.overflow) != 4 {
+ t.Fatalf("expecting 4 entries now: %+v", cs.overflow)
}
- x, _ := o.findOverflowEntry(5)
+ _, x, _ := cs.findOverflowEntry(5)
if x.Key != 5 {
t.Fatalf("expecting entry 5 now: %+v", x)
}
- for i, x := range o {
+ for i, x := range cs.overflow {
println("overflow[", i, "]:", x.Key)
}
println()
- o = o.deleteOverflowEntry(1)
+ cs.deleteOverflowEntry(1)
- for i, x := range o {
+ for i, x := range cs.overflow {
println("overflow[", i, "]:", x.Key)
}
println()
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 4, Offset: ToOffset(44), Size: 44})
- for i, x := range o {
+ cs.setOverflowEntry(4, ToOffset(44), 44)
+ for i, x := range cs.overflow {
println("overflow[", i, "]:", x.Key)
}
println()
- o = o.setOverflowEntry(SectionalNeedleValue{Key: 1, Offset: ToOffset(11), Size: 11})
+ cs.setOverflowEntry(1, ToOffset(11), 11)
- for i, x := range o {
+ for i, x := range cs.overflow {
println("overflow[", i, "]:", x.Key)
}
println()
diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go
index d23666357..86eae03ac 100644
--- a/weed/storage/types/needle_types.go
+++ b/weed/storage/types/needle_types.go
@@ -8,8 +8,15 @@ import (
)
type Offset struct {
- // b5 byte // unused
- // b4 byte // unused
+ OffsetHigher
+ OffsetLower
+}
+
+type OffsetHigher struct {
+ // b4 byte
+}
+
+type OffsetLower struct {
b3 byte
b2 byte
b1 byte
@@ -19,9 +26,9 @@ type Offset struct {
type Cookie uint32
const (
- OffsetSize = 4
+ OffsetSize = 4 // + 1
SizeSize = 4 // uint32 size
- NeedleEntrySize = NeedleIdSize + OffsetSize + SizeSize
+ NeedleEntrySize = CookieSize + NeedleIdSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8
@@ -55,21 +62,26 @@ func OffsetToBytes(bytes []byte, offset Offset) {
bytes[0] = offset.b3
}
+// only for testing, will be removed later.
func Uint32ToOffset(offset uint32) Offset {
return Offset{
- b0: byte(offset),
- b1: byte(offset >> 8),
- b2: byte(offset >> 16),
- b3: byte(offset >> 24),
+ OffsetLower: OffsetLower{
+ b0: byte(offset),
+ b1: byte(offset >> 8),
+ b2: byte(offset >> 16),
+ b3: byte(offset >> 24),
+ },
}
}
func BytesToOffset(bytes []byte) Offset {
return Offset{
- b0: bytes[3],
- b1: bytes[2],
- b2: bytes[1],
- b3: bytes[0],
+ OffsetLower: OffsetLower{
+ b0: bytes[3],
+ b1: bytes[2],
+ b2: bytes[1],
+ b3: bytes[0],
+ },
}
}