diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-03-22 00:03:16 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-03-22 00:03:16 -0700 |
| commit | b465095db10bf0f7b9214e75fe1e81ec3fc35ce3 (patch) | |
| tree | 4fc2ac53def10014766875a6bd90c601d15029a8 /weed/storage | |
| parent | df461402cccd28fc863d78aba3304893ef82f328 (diff) | |
| download | seaweedfs-b465095db10bf0f7b9214e75fe1e81ec3fc35ce3.tar.xz seaweedfs-b465095db10bf0f7b9214e75fe1e81ec3fc35ce3.zip | |
shell: add volume.check.disk to fix inconsistency for replicated volumes
fix https://github.com/chrislusf/seaweedfs/issues/1923
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/needle/needle_read_write.go | 34 | ||||
| -rw-r--r-- | weed/storage/needle_map/memdb.go | 9 | ||||
| -rw-r--r-- | weed/storage/volume_read.go | 8 | ||||
| -rw-r--r-- | weed/storage/volume_write.go | 26 |
4 files changed, 73 insertions, 4 deletions
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index e51df955e..2d7420639 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -3,13 +3,12 @@ package needle import ( "errors" "fmt" - "io" - "math" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "io" + "math" ) const ( @@ -156,6 +155,35 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return offset, size, actualSize, err } +func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) { + + if end, _, e := w.GetStat(); e == nil { + defer func(w backend.BackendStorageFile, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) + } + } + }(w, end) + offset = uint64(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + + if version == Version3 { + tsOffset := NeedleHeaderSize + size + NeedleChecksumSize + util.Uint64toBytes(dataSlice[tsOffset : tsOffset+TimestampSize], appendAtNs) + } + + if err == nil { + _, err = w.WriteAt(dataSlice, int64(offset)) + } + + return + +} + func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) { dataSize := GetActualSize(size, version) diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index b25b5e89a..ba1fd3d1e 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -2,6 +2,7 @@ package needle_map import ( "fmt" + "io" "os" "github.com/syndtr/goleveldb/leveldb" @@ -104,7 +105,13 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { } defer idxFile.Close() - return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error { + return cm.LoadFromReaderAt(idxFile) + +} + +func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) { + + return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error { if offset.IsZero() || size.IsDeleted() { return cm.Delete(key) } diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index 31dd3c784..f689eeec0 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -58,6 +58,14 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro return -1, ErrorNotFound } +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version()) +} + type VolumeFileScanner interface { VisitSuperBlock(super_block.SuperBlock) error ReadNeedleBody() bool diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index c3cb813fb..a286c5dd5 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -299,3 +299,29 @@ func (v *Volume) startWorker() { } }() } + +func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) { + return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) + } + + appendAtNs := uint64(time.Now().UnixNano()) + offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version()) + + v.checkReadWriteError(err) + if err != nil { + return err + } + v.lastAppendAtNs = appendAtNs + + // add to needle map + if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil { + glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err) + } + + return err +} |
