aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-22 00:03:16 -0700
committerChris Lu <chris.lu@gmail.com>2021-03-22 00:03:16 -0700
commitb465095db10bf0f7b9214e75fe1e81ec3fc35ce3 (patch)
tree4fc2ac53def10014766875a6bd90c601d15029a8 /weed/storage
parentdf461402cccd28fc863d78aba3304893ef82f328 (diff)
downloadseaweedfs-b465095db10bf0f7b9214e75fe1e81ec3fc35ce3.tar.xz
seaweedfs-b465095db10bf0f7b9214e75fe1e81ec3fc35ce3.zip
shell: add volume.check.disk to fix inconsistency for replicated volumes
fix https://github.com/chrislusf/seaweedfs/issues/1923
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/needle/needle_read_write.go34
-rw-r--r--weed/storage/needle_map/memdb.go9
-rw-r--r--weed/storage/volume_read.go8
-rw-r--r--weed/storage/volume_write.go26
4 files changed, 73 insertions, 4 deletions
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index e51df955e..2d7420639 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -3,13 +3,12 @@ package needle
import (
"errors"
"fmt"
- "io"
- "math"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math"
)
const (
@@ -156,6 +155,35 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return offset, size, actualSize, err
}
+func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) {
+
+ if end, _, e := w.GetStat(); e == nil {
+ defer func(w backend.BackendStorageFile, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
+ }
+ }
+ }(w, end)
+ offset = uint64(end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
+ }
+
+ if version == Version3 {
+ tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
+ util.Uint64toBytes(dataSlice[tsOffset : tsOffset+TimestampSize], appendAtNs)
+ }
+
+ if err == nil {
+ _, err = w.WriteAt(dataSlice, int64(offset))
+ }
+
+ return
+
+}
+
func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index b25b5e89a..ba1fd3d1e 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -2,6 +2,7 @@ package needle_map
import (
"fmt"
+ "io"
"os"
"github.com/syndtr/goleveldb/leveldb"
@@ -104,7 +105,13 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
}
defer idxFile.Close()
- return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error {
+ return cm.LoadFromReaderAt(idxFile)
+
+}
+
+func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) {
+
+ return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error {
if offset.IsZero() || size.IsDeleted() {
return cm.Delete(key)
}
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index 31dd3c784..f689eeec0 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -58,6 +58,14 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
return -1, ErrorNotFound
}
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version())
+}
+
type VolumeFileScanner interface {
VisitSuperBlock(super_block.SuperBlock) error
ReadNeedleBody() bool
diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go
index c3cb813fb..a286c5dd5 100644
--- a/weed/storage/volume_write.go
+++ b/weed/storage/volume_write.go
@@ -299,3 +299,29 @@ func (v *Volume) startWorker() {
}
}()
}
+
+func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) {
+ return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
+ }
+
+ appendAtNs := uint64(time.Now().UnixNano())
+ offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
+
+ v.checkReadWriteError(err)
+ if err != nil {
+ return err
+ }
+ v.lastAppendAtNs = appendAtNs
+
+ // add to needle map
+ if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil {
+ glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err)
+ }
+
+ return err
+}