aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-11 22:00:41 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-11 22:00:41 -0700
commit3afa451cdc1352c00dddaf09e5c9a987ef0a5614 (patch)
tree21be048147359239959df30de6a67cfdc8156ca1
parentb530f12327ef2dbf0048c2ce1623b77144d47bb7 (diff)
downloadseaweedfs-3afa451cdc1352c00dddaf09e5c9a987ef0a5614.tar.xz
seaweedfs-3afa451cdc1352c00dddaf09e5c9a987ef0a5614.zip
volume: find a non-empty offset when binary searching by timestamp
-rw-r--r--weed/storage/volume_backup.go59
-rw-r--r--weed/storage/volume_write_test.go19
2 files changed, 60 insertions, 18 deletions
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 7aad2b971..a8ec50cad 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -194,12 +194,31 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
err = fmt.Errorf("read entry %d: %v", m, err)
return
}
- for ; offset.IsZero() && m < h; m++ {
- offset, err = v.readOffsetFromIndex(m)
- if err != nil {
- err = fmt.Errorf("read entry %d: %v", m, err)
+ if offset.IsZero() {
+ leftIndex, _, leftNs, leftErr := v.readLeftNs(m)
+ if leftErr != nil {
+ err = leftErr
+ return
+ }
+ rightIndex, rightOffset, rightNs, rightErr := v.readRightNs(m)
+ if rightErr != nil {
+ err = rightErr
return
}
+ if rightNs <= sinceNs {
+ l = rightIndex
+ if l == entryCount {
+ return Offset{}, true, nil
+ } else {
+ continue
+ }
+ }
+ if sinceNs < leftNs {
+ h = leftIndex + 1
+ continue
+ }
+ return rightOffset, false, nil
+
}
if offset.IsZero() {
return Offset{}, true, nil
@@ -230,6 +249,38 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
}
+func (v *Volume) readRightNs(m int64) (index int64, offset Offset, ts uint64, err error) {
+ index = m
+ for offset.IsZero() {
+ index++
+ offset, err = v.readOffsetFromIndex(index)
+ if err != nil {
+ err = fmt.Errorf("read entry %d: %v", index, err)
+ return
+ }
+ }
+ if !offset.IsZero() {
+ ts, err = v.readAppendAtNs(offset)
+ }
+ return
+}
+
+func (v *Volume) readLeftNs(m int64) (index int64, offset Offset, ts uint64, err error) {
+ index = m
+ for offset.IsZero() {
+ index--
+ offset, err = v.readOffsetFromIndex(index)
+ if err != nil {
+ err = fmt.Errorf("read entry %d: %v", index, err)
+ return
+ }
+ }
+ if !offset.IsZero() {
+ ts, err = v.readAppendAtNs(offset)
+ }
+ return
+}
+
// bytes is of size NeedleMapEntrySize
func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
v.dataFileAccessLock.RLock()
diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go
index e0ee3dac7..4245bdb52 100644
--- a/weed/storage/volume_write_test.go
+++ b/weed/storage/volume_write_test.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io/ioutil"
"os"
"testing"
@@ -22,7 +23,7 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
t.Fatalf("volume creation: %v", err)
}
- count := 10
+ count := 20
for i:=1;i<count;i++{
n := newRandomNeedle(uint64(i))
@@ -32,9 +33,9 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
}
}
- for i:=1;i<5;i++{
+ for i:=1;i<15;i++{
n := newEmptyNeedle(uint64(i))
- _, err := v.doDeleteRequest(n)
+ err := v.nm.Put(n.Id, types.Offset{}, types.TombstoneFileSize)
if err != nil {
t.Fatalf("delete needle %d: %v", i, err)
}
@@ -42,15 +43,12 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
ts1 := time.Now().UnixNano()
- var ts2 uint64
-
- for i:=5;i<count;i++{
+ for i:=15;i<count;i++{
n := newEmptyNeedle(uint64(i))
_, err := v.doDeleteRequest(n)
if err != nil {
t.Fatalf("delete needle %d: %v", i, err)
}
- ts2 = n.AppendAtNs
}
offset, isLast, err := v.BinarySearchByAppendAtNs(uint64(ts1))
@@ -59,11 +57,4 @@ func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
}
fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast)
- offset, isLast, err = v.BinarySearchByAppendAtNs(uint64(ts2))
- if err != nil {
- t.Fatalf("lookup by ts: %v", err)
- }
- fmt.Printf("offset: %v, isLast: %v\n", offset.ToActualOffset(), isLast)
-
-
} \ No newline at end of file