diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2025-06-05 23:03:29 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-05 14:03:29 -0700 |
| commit | bed0a64693cdfd9bbcd11ad6f813fbd35af08587 (patch) | |
| tree | 031e4fdeb894bfdc538f90f3492fa648ff293b12 /weed/storage/needle_map/old | |
| parent | d8ddc22fc24bc22a87a8731689bb0f3776e8c4fc (diff) | |
| download | seaweedfs-bed0a64693cdfd9bbcd11ad6f813fbd35af08587.tar.xz seaweedfs-bed0a64693cdfd9bbcd11ad6f813fbd35af08587.zip | |
New `needle_map.CompactMap()` implementation for reduced memory usage (#6842)
* Rework `needle_map.CompactMap()` to maximize memory efficiency.
* Use a memory-efficient structure for `CompactMap` needle value entries.
This slightly complicates the code, but makes a **massive** difference
in memory efficiency - preliminary results show a ~30% reduction in
heap usage, with no measurable performance impact otherwise.
* Clean up type for `CompactMap` chunk IDs.
* Add a small comment description for `CompactMap()`.
* Add the old version of `CompactMap()` for comparison purposes.
Diffstat (limited to 'weed/storage/needle_map/old')
| -rw-r--r-- | weed/storage/needle_map/old/compact_map.go | 332 | ||||
| -rw-r--r-- | weed/storage/needle_map/old/compact_map_perf_test.go | 92 | ||||
| -rw-r--r-- | weed/storage/needle_map/old/compact_map_test.go | 243 |
3 files changed, 667 insertions, 0 deletions
diff --git a/weed/storage/needle_map/old/compact_map.go b/weed/storage/needle_map/old/compact_map.go new file mode 100644 index 000000000..ca9892b0f --- /dev/null +++ b/weed/storage/needle_map/old/compact_map.go @@ -0,0 +1,332 @@ +package needle_map + +import ( + "sort" + "sync" + + . "github.com/seaweedfs/seaweedfs/weed/storage/types" + + new_map "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" +) + +const ( + MaxSectionBucketSize = 1024 * 8 + LookBackWindowSize = 1024 // how many entries to look back when inserting into a section +) + +type SectionalNeedleId uint32 + +const SectionalNeedleIdLimit = 1<<32 - 1 + +type SectionalNeedleValue struct { + Key SectionalNeedleId + OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G + Size Size `comment:"Size of the data portion"` + OffsetHigher OffsetHigher +} + +type CompactSection struct { + sync.RWMutex + values []SectionalNeedleValue + overflow Overflow + start NeedleId + end NeedleId +} + +type Overflow []SectionalNeedleValue + +func NewCompactSection(start NeedleId) *CompactSection { + return &CompactSection{ + values: make([]SectionalNeedleValue, 0), + overflow: Overflow(make([]SectionalNeedleValue, 0)), + start: start, + } +} + +// return old entry size +func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { + cs.Lock() + defer cs.Unlock() + + if key > cs.end { + cs.end = key + } + skey := SectionalNeedleId(key - cs.start) + if i := cs.binarySearchValues(skey); i >= 0 { + // update + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size + cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size + return + } + + var lkey SectionalNeedleId + if len(cs.values) > 0 { + lkey = cs.values[len(cs.values)-1].Key + } + + hasAdded := false + switch { + case len(cs.values) < MaxSectionBucketSize && lkey <= skey: + // non-overflow insert + cs.values = append(cs.values, SectionalNeedleValue{ + Key: skey, + OffsetLower: offset.OffsetLower, + Size: size, + OffsetHigher: offset.OffsetHigher, + }) + hasAdded = true + case len(cs.values) < MaxSectionBucketSize: + // still has capacity and only partially out of order + lookBackIndex := len(cs.values) - LookBackWindowSize + if lookBackIndex < 0 { + lookBackIndex = 0 + } + if cs.values[lookBackIndex].Key <= skey { + for ; lookBackIndex < len(cs.values); lookBackIndex++ { + if cs.values[lookBackIndex].Key >= skey { + break + } + } + cs.values = append(cs.values, SectionalNeedleValue{}) + copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:]) + cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size + cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher + hasAdded = true + } + } + + // overflow insert + if !hasAdded { + if oldValue, found := cs.findOverflowEntry(skey); found { + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size + } + cs.setOverflowEntry(skey, offset, size) + } else { + // if we maxed out our values bucket, pin its capacity to minimize memory usage + if len(cs.values) == MaxSectionBucketSize { + bucket := make([]SectionalNeedleValue, len(cs.values)) + copy(bucket, cs.values) + cs.values = bucket + } + } + + return +} + +func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) { + needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size, OffsetHigher: offset.OffsetHigher} + insertCandidate := sort.Search(len(cs.overflow), func(i int) bool { + return cs.overflow[i].Key >= needleValue.Key + }) + + if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key { + cs.overflow[insertCandidate] = needleValue + return + } + + cs.overflow = append(cs.overflow, SectionalNeedleValue{}) + copy(cs.overflow[insertCandidate+1:], cs.overflow[insertCandidate:]) + cs.overflow[insertCandidate] = needleValue +} + +func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) { + foundCandidate := sort.Search(len(cs.overflow), func(i int) bool { + return cs.overflow[i].Key >= key + }) + if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key { + return cs.overflow[foundCandidate], true + } + return nv, false +} + +func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { + length := len(cs.overflow) + deleteCandidate := sort.Search(length, func(i int) bool { + return cs.overflow[i].Key >= key + }) + if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { + if cs.overflow[deleteCandidate].Size.IsValid() { + cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size + } + } +} + +// return old entry size +func (cs *CompactSection) Delete(key NeedleId) Size { + cs.Lock() + defer cs.Unlock() + ret := Size(0) + if key > cs.end { + return ret + } + skey := SectionalNeedleId(key - cs.start) + if i := cs.binarySearchValues(skey); i >= 0 { + if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() { + ret = cs.values[i].Size + cs.values[i].Size = -cs.values[i].Size + } + } + if v, found := cs.findOverflowEntry(skey); found { + cs.deleteOverflowEntry(skey) + ret = v.Size + } + return ret +} +func (cs *CompactSection) Get(key NeedleId) (*new_map.NeedleValue, bool) { + cs.RLock() + defer cs.RUnlock() + if key > cs.end { + return nil, false + } + skey := SectionalNeedleId(key - cs.start) + if v, ok := cs.findOverflowEntry(skey); ok { + nv := toNeedleValue(v, cs) + return &nv, true + } + if i := cs.binarySearchValues(skey); i >= 0 { + nv := toNeedleValue(cs.values[i], cs) + return &nv, true + } + return nil, false +} +func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int { + x := sort.Search(len(cs.values), func(i int) bool { + return cs.values[i].Key >= key + }) + if x >= len(cs.values) { + return -1 + } + if cs.values[x].Key > key { + return -2 + } + return x +} + +// This map assumes mostly inserting increasing keys +// This map assumes mostly inserting increasing keys +type CompactMap struct { + list []*CompactSection +} + +func NewCompactMap() *CompactMap { + return &CompactMap{} +} + +func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { + x := cm.binarySearchCompactSection(key) + if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit { + // println(x, "adding to existing", len(cm.list), "sections, starting", key) + cs := NewCompactSection(key) + cm.list = append(cm.list, cs) + x = len(cm.list) - 1 + //keep compact section sorted by start + for x >= 0 { + if x > 0 && cm.list[x-1].start > key { + cm.list[x] = cm.list[x-1] + // println("shift", x, "start", cs.start, "to", x-1) + x = x - 1 + } else { + cm.list[x] = cs + // println("cs", x, "start", cs.start) + break + } + } + } + // println(key, "set to section[", x, "].start", cm.list[x].start) + return cm.list[x].Set(key, offset, size) +} +func (cm *CompactMap) Delete(key NeedleId) Size { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return Size(0) + } + return cm.list[x].Delete(key) +} +func (cm *CompactMap) Get(key NeedleId) (*new_map.NeedleValue, bool) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return nil, false + } + return cm.list[x].Get(key) +} +func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { + l, h := 0, len(cm.list)-1 + if h < 0 { + return -5 + } + if cm.list[h].start <= key { + if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end { + return h + } + return -4 + } + for l <= h { + m := (l + h) / 2 + if key < cm.list[m].start { + h = m - 1 + } else { // cm.list[m].start <= key + if cm.list[m+1].start <= key { + l = m + 1 + } else { + return m + } + } + } + return -3 +} + +// Visit visits all entries or stop if any error when visiting +func (cm *CompactMap) AscendingVisit(visit func(new_map.NeedleValue) error) error { + for _, cs := range cm.list { + cs.RLock() + var i, j int + for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); { + if cs.overflow[i].Key < cs.values[j].Key { + if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { + cs.RUnlock() + return err + } + i++ + } else if cs.overflow[i].Key == cs.values[j].Key { + j++ + } else { + if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { + cs.RUnlock() + return err + } + j++ + } + } + for ; i < len(cs.overflow); i++ { + if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil { + cs.RUnlock() + return err + } + } + for ; j < len(cs.values); j++ { + if err := visit(toNeedleValue(cs.values[j], cs)); err != nil { + cs.RUnlock() + return err + } + } + cs.RUnlock() + } + return nil +} + +func toNeedleValue(snv SectionalNeedleValue, cs *CompactSection) new_map.NeedleValue { + offset := Offset{ + OffsetHigher: snv.OffsetHigher, + OffsetLower: snv.OffsetLower, + } + return new_map.NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size} +} + +func toSectionalNeedleValue(nv new_map.NeedleValue, cs *CompactSection) SectionalNeedleValue { + return SectionalNeedleValue{ + Key: SectionalNeedleId(nv.Key - cs.start), + OffsetLower: nv.Offset.OffsetLower, + Size: nv.Size, + OffsetHigher: nv.Offset.OffsetHigher, + } +} diff --git a/weed/storage/needle_map/old/compact_map_perf_test.go b/weed/storage/needle_map/old/compact_map_perf_test.go new file mode 100644 index 000000000..4728930db --- /dev/null +++ b/weed/storage/needle_map/old/compact_map_perf_test.go @@ -0,0 +1,92 @@ +package needle_map + +import ( + "fmt" + "log" + "os" + "runtime" + "testing" + "time" + + . "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +/* + +To see the memory usage: + +go test -run TestMemoryUsage +The Alloc section shows the in-use memory increase for each iteration. + +go test -run TestMemoryUsage -memprofile=mem.out +go tool pprof --alloc_space needle.test mem.out + + +*/ + +func TestMemoryUsage(t *testing.T) { + + var maps []*CompactMap + totalRowCount := uint64(0) + + startTime := time.Now() + for i := 0; i < 10; i++ { + indexFile, ie := os.OpenFile("../../../../test/data/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + m, rowCount := loadNewNeedleMap(indexFile) + maps = append(maps, m) + totalRowCount += rowCount + + indexFile.Close() + + PrintMemUsage(totalRowCount) + now := time.Now() + fmt.Printf("\tTaken = %v\n", now.Sub(startTime)) + startTime = now + } + +} + +func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) { + m := NewCompactMap() + bytes := make([]byte, NeedleMapEntrySize) + rowCount := uint64(0) + count, e := file.Read(bytes) + for count > 0 && e == nil { + for i := 0; i < count; i += NeedleMapEntrySize { + rowCount++ + key := BytesToNeedleId(bytes[i : i+NeedleIdSize]) + offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize]) + size := BytesToSize(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize]) + + if !offset.IsZero() { + m.Set(NeedleId(key), offset, size) + } else { + m.Delete(key) + } + } + + count, e = file.Read(bytes) + } + + return m, rowCount + +} + +func PrintMemUsage(totalRowCount uint64) { + + runtime.GC() + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf("Each %.02f Bytes", float64(m.Alloc)/float64(totalRowCount)) + fmt.Printf("\tAlloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v", m.NumGC) +} +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +} diff --git a/weed/storage/needle_map/old/compact_map_test.go b/weed/storage/needle_map/old/compact_map_test.go new file mode 100644 index 000000000..d2d0f1569 --- /dev/null +++ b/weed/storage/needle_map/old/compact_map_test.go @@ -0,0 +1,243 @@ +package needle_map + +import ( + "fmt" + "log" + "os" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/sequence" + . "github.com/seaweedfs/seaweedfs/weed/storage/types" + + new_map "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" +) + +func TestSnowflakeSequencer(t *testing.T) { + m := NewCompactMap() + seq, _ := sequence.NewSnowflakeSequencer("for_test", 1) + + for i := 0; i < 200000; i++ { + id := seq.NextFileId(1) + oldOffset, oldSize := m.Set(NeedleId(id), ToOffset(8), 3000073) + if oldSize != 0 { + t.Errorf("id %d oldOffset %v oldSize %d", id, oldOffset, oldSize) + } + } + +} + +func TestOverflow2(t *testing.T) { + m := NewCompactMap() + _, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073) + if oldSize != 0 { + t.Fatalf("expecting no previous data") + } + _, oldSize = m.Set(NeedleId(150088), ToOffset(8), 3000073) + if oldSize != 3000073 { + t.Fatalf("expecting previous data size is %d, not %d", 3000073, oldSize) + } + m.Set(NeedleId(150073), ToOffset(8), 3000073) + m.Set(NeedleId(150089), ToOffset(8), 3000073) + m.Set(NeedleId(150076), ToOffset(8), 3000073) + m.Set(NeedleId(150124), ToOffset(8), 3000073) + m.Set(NeedleId(150137), ToOffset(8), 3000073) + m.Set(NeedleId(150147), ToOffset(8), 3000073) + m.Set(NeedleId(150145), ToOffset(8), 3000073) + m.Set(NeedleId(150158), ToOffset(8), 3000073) + m.Set(NeedleId(150162), ToOffset(8), 3000073) + + m.AscendingVisit(func(value new_map.NeedleValue) error { + println("needle key:", value.Key) + return nil + }) +} + +func TestIssue52(t *testing.T) { + m := NewCompactMap() + m.Set(NeedleId(10002), ToOffset(10002), 10002) + if element, ok := m.Get(NeedleId(10002)); ok { + fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) + } + m.Set(NeedleId(10001), ToOffset(10001), 10001) + if element, ok := m.Get(NeedleId(10002)); ok { + fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) + } else { + t.Fatal("key 10002 missing after setting 10001") + } +} + +func TestCompactMap(t *testing.T) { + m := NewCompactMap() + for i := uint32(0); i < 100*MaxSectionBucketSize; i += 2 { + m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) + } + + for i := uint32(0); i < 100*MaxSectionBucketSize; i += 37 { + m.Delete(NeedleId(i)) + } + + for i := uint32(0); i < 10*MaxSectionBucketSize; i += 3 { + m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5)) + } + + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // glog.V(4).Infoln(i, "=", v.Key, v.Offset, v.Size) + // } + // } + + for i := uint32(0); i < 10*MaxSectionBucketSize; i++ { + v, ok := m.Get(NeedleId(i)) + if i%3 == 0 { + if !ok { + t.Fatal("key", i, "missing!") + } + if v.Size != Size(i+5) { + t.Fatal("key", i, "size", v.Size) + } + } else if i%37 == 0 { + if ok && v.Size.IsValid() { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v.Size != Size(i) { + t.Fatal("key", i, "size", v.Size) + } + } + } + + for i := uint32(10 * MaxSectionBucketSize); i < 100*MaxSectionBucketSize; i++ { + v, ok := m.Get(NeedleId(i)) + if i%37 == 0 { + if ok && v.Size.IsValid() { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v == nil { + t.Fatal("key", i, "missing") + } + if v.Size != Size(i) { + t.Fatal("key", i, "size", v.Size) + } + } + } + +} + +func TestOverflow(t *testing.T) { + cs := NewCompactSection(1) + + cs.setOverflowEntry(1, ToOffset(12), 12) + cs.setOverflowEntry(2, ToOffset(12), 12) + cs.setOverflowEntry(3, ToOffset(12), 12) + cs.setOverflowEntry(4, ToOffset(12), 12) + cs.setOverflowEntry(5, ToOffset(12), 12) + + if cs.overflow[2].Key != 3 { + t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) + } + + cs.setOverflowEntry(3, ToOffset(24), 24) + + if cs.overflow[2].Key != 3 { + t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) + } + + if cs.overflow[2].Size != 24 { + t.Fatalf("expecting o[2] has size 24: %+v", cs.overflow[2].Size) + } + + cs.deleteOverflowEntry(4) + + if len(cs.overflow) != 5 { + t.Fatalf("expecting 5 entries now: %+v", cs.overflow) + } + + x, _ := cs.findOverflowEntry(5) + if x.Key != 5 { + t.Fatalf("expecting entry 5 now: %+v", x) + } + + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key) + } + println() + + cs.deleteOverflowEntry(1) + + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key, "size", x.Size) + } + println() + + cs.setOverflowEntry(4, ToOffset(44), 44) + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key) + } + println() + + cs.setOverflowEntry(1, ToOffset(11), 11) + + for i, x := range cs.overflow { + println("overflow[", i, "]:", x.Key) + } + println() + +} + +func TestCompactSection_Get(t *testing.T) { + var maps []*CompactMap + totalRowCount := uint64(0) + indexFile, ie := os.OpenFile("../../../../test/data/sample.idx", + os.O_RDWR|os.O_RDONLY, 0644) + defer indexFile.Close() + if ie != nil { + log.Fatalln(ie) + } + + m, rowCount := loadNewNeedleMap(indexFile) + maps = append(maps, m) + totalRowCount += rowCount + m.Set(1574318345753513987, ToOffset(10002), 10002) + nv, ok := m.Get(1574318345753513987) + if ok { + t.Log(uint64(nv.Key)) + } + + nv1, ok := m.Get(1574318350048481283) + if ok { + t.Error(uint64(nv1.Key)) + } + + m.Set(1574318350048481283, ToOffset(10002), 10002) + nv2, ok1 := m.Get(1574318350048481283) + if ok1 { + t.Log(uint64(nv2.Key)) + } + + m.Delete(nv2.Key) + nv3, has := m.Get(nv2.Key) + if has && nv3.Size > 0 { + t.Error(uint64(nv3.Size)) + } +} + +// Test after putting 1 ~ LookBackWindowSize*3 items in sequential order, but missing item LookBackWindowSize +// insert the item LookBackWindowSize in the middle of the sequence +func TestCompactSection_PutOutOfOrderItemBeyondLookBackWindow(t *testing.T) { + m := NewCompactMap() + + // put 1 ~ 10 + for i := 1; i <= LookBackWindowSize*3; i++ { + if i != LookBackWindowSize { + m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) + } + } + + m.Set(NeedleId(LookBackWindowSize), ToOffset(int64(LookBackWindowSize)), Size(LookBackWindowSize)) + + // check if 8 is in the right place + if v, ok := m.Get(NeedleId(LookBackWindowSize)); !ok || v.Offset != ToOffset(LookBackWindowSize) || v.Size != Size(LookBackWindowSize) { + t.Fatalf("expected to find LookBackWindowSize at offset %d with size %d, but got %v", LookBackWindowSize, LookBackWindowSize, v) + } +} |
