aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-11-27 16:18:48 -0800
committerChris Lu <chris.lu@gmail.com>2020-11-27 16:18:48 -0800
commit9ac4935f22a2e871d8933cf36ddc2cb2f85625a7 (patch)
tree760b027d26576c0dc5b6dbdc1489f7fc3f4c55f6
parent85554bea3815ffc69bed6a3d45f4e3a64c3fd48b (diff)
downloadseaweedfs-9ac4935f22a2e871d8933cf36ddc2cb2f85625a7.tar.xz
seaweedfs-9ac4935f22a2e871d8933cf36ddc2cb2f85625a7.zip
read from volume index file directly instead of open a separate file
fix https://github.com/chrislusf/seaweedfs/issues/1640 read from volume index file directly instead of open a separate file, to ensure reading latest index entries.
-rw-r--r--weed/storage/needle_map.go20
-rw-r--r--weed/storage/volume_backup.go32
2 files changed, 31 insertions, 21 deletions
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
index e91856dfe..9f331267d 100644
--- a/weed/storage/needle_map.go
+++ b/weed/storage/needle_map.go
@@ -2,9 +2,11 @@ package storage
import (
"fmt"
+ "io"
"os"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -31,6 +33,7 @@ type NeedleMapper interface {
MaxFileKey() NeedleId
IndexFileSize() uint64
Sync() error
+ ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
}
type baseNeedleMapper struct {
@@ -64,3 +67,20 @@ func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size
func (nm *baseNeedleMapper) Sync() error {
return nm.indexFile.Sync()
}
+
+func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) {
+ bytes := make([]byte, NeedleMapEntrySize)
+ var readCount int
+ if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil {
+ if err == io.EOF {
+ if readCount == NeedleMapEntrySize {
+ err = nil
+ }
+ }
+ if err != nil {
+ return
+ }
+ }
+ key, offset, size = idx.IdxFileEntry(bytes)
+ return
+}
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 62004d4da..9aeb10f69 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -168,25 +168,13 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
// on server side
func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
- indexFile, openErr := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
- if openErr != nil {
- err = fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), openErr)
- return
- }
- defer indexFile.Close()
- fi, statErr := indexFile.Stat()
- if statErr != nil {
- err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
- return
- }
- fileSize := fi.Size()
+ fileSize := int64(v.IndexFileSize())
if fileSize%NeedleMapEntrySize != 0 {
- err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
+ err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize)
return
}
- bytes := make([]byte, NeedleMapEntrySize)
entryCount := fileSize / NeedleMapEntrySize
l := int64(0)
h := entryCount
@@ -200,7 +188,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
}
// read the appendAtNs for entry m
- offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
+ offset, err = v.readOffsetFromIndex(m)
if err != nil {
return
}
@@ -224,19 +212,21 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
return Offset{}, true, nil
}
- offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
+ offset, err = v.readOffsetFromIndex(l)
return offset, false, err
}
// bytes is of size NeedleMapEntrySize
-func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
- if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
- return Offset{}, readErr
+func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ if v.nm == nil {
+ return Offset{}, io.EOF
}
- _, offset, _ := idx.IdxFileEntry(bytes)
- return offset, nil
+ _, offset, _, err := v.nm.ReadIndexEntry(m)
+ return offset, err
}
// generate the volume idx