aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_read_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_read_write.go')
-rw-r--r--weed/storage/volume_read_write.go39
1 files changed, 36 insertions, 3 deletions
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 363835eb9..50ea8fecb 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -195,7 +195,7 @@ func ScanVolumeFile(dirname string, collection string, id VolumeId,
}
func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
- n, rest, e := ReadNeedleHeader(dataFile, version, offset)
+ n, _, rest, e := ReadNeedleHeader(dataFile, version, offset)
if e != nil {
if e == io.EOF {
return nil
@@ -204,7 +204,7 @@ func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volume
}
for n != nil {
if volumeFileScanner.ReadNeedleBody() {
- if err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil {
+ if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
@@ -219,7 +219,40 @@ func ScanVolumeFileFrom(version Version, dataFile *os.File, offset int64, volume
}
offset += NeedleEntrySize + rest
glog.V(4).Infof("==> new entry offset %d", offset)
- if n, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil {
+ if n, _, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
+ }
+ glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ }
+ return nil
+}
+
+func ScanVolumeFileNeedleFrom(version Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) {
+ n, nh, rest, e := ReadNeedleHeader(dataFile, version, offset)
+ if e != nil {
+ if e == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e)
+ }
+ for n != nil {
+ var needleBody []byte
+ if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleEntrySize, rest); err != nil {
+ glog.V(0).Infof("cannot read needle body: %v", err)
+ //err = fmt.Errorf("cannot read needle body: %v", err)
+ //return
+ }
+ err = fn(nh, needleBody, n.AppendAtNs)
+ if err != nil {
+ glog.V(0).Infof("visit needle error: %v", err)
+ return
+ }
+ offset += NeedleEntrySize + rest
+ glog.V(4).Infof("==> new entry offset %d", offset)
+ if n, nh, rest, err = ReadNeedleHeader(dataFile, version, offset); err != nil {
if err == io.EOF {
return nil
}