diff options
Diffstat (limited to 'weed/storage/volume_read_write.go')
| -rw-r--r-- | weed/storage/volume_read_write.go | 31 |
1 files changed, 16 insertions, 15 deletions
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 7c9539021..f2b17a827 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -9,6 +9,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "github.com/chrislusf/seaweedfs/weed/storage/needle" . "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -26,7 +27,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { nv, ok := v.nm.Get(n.Id) if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { oldNeedle := new(needle.Needle) - err := oldNeedle.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) + err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err) return false @@ -45,10 +46,10 @@ func (v *Volume) Destroy() (err error) { err = fmt.Errorf("volume %d is compacting", v.Id) return } - mMap, exists := memory_map.FileMemoryMap[v.dataFile.Name()] + mMap, exists := memory_map.FileMemoryMap[v.DataBackend.String()] if exists { mMap.DeleteFileAndMemoryMap() - delete(memory_map.FileMemoryMap, v.dataFile.Name()) + delete(memory_map.FileMemoryMap, v.DataBackend.String()) } v.Close() @@ -64,7 +65,7 @@ func (v *Volume) Destroy() (err error) { func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { - err = fmt.Errorf("%s is read-only", v.dataFile.Name()) + err = fmt.Errorf("%s is read-only", v.DataBackend.String()) return } v.dataFileAccessLock.Lock() @@ -83,7 +84,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn // check whether existing needle cookie matches nv, ok := v.nm.Get(n.Id) if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset()) + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) if existingNeedleReadErr != nil { err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) return @@ -97,7 +98,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) - if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil { + if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { return } v.lastAppendAtNs = n.AppendAtNs @@ -117,7 +118,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) if v.readOnly { - return 0, fmt.Errorf("%s is read-only", v.dataFile.Name()) + return 0, fmt.Errorf("%s is read-only", v.DataBackend.String()) } v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() @@ -127,7 +128,7 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { size := nv.Size n.Data = nil n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, _, _, err := n.Append(v.dataFile, v.Version()) + offset, _, _, err := n.Append(v.DataBackend, v.Version()) if err != nil { return size, err } @@ -155,7 +156,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) { if nv.Size == 0 { return 0, nil } - err := n.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) + err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { return 0, err } @@ -198,21 +199,21 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, offset := int64(v.SuperBlock.BlockSize()) - return ScanVolumeFileFrom(version, v.dataFile, offset, volumeFileScanner) + return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) } -func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset) +func ScanVolumeFileFrom(version needle.Version, datBackend backend.DataStorageBackend, offset int64, volumeFileScanner VolumeFileScanner) (err error) { + n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) if e != nil { if e == io.EOF { return nil } - return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e) + return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.String(), offset, e) } for n != nil { var needleBody []byte if volumeFileScanner.ReadNeedleBody() { - if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil { + if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { glog.V(0).Infof("cannot read needle body: %v", err) //err = fmt.Errorf("cannot read needle body: %v", err) //return @@ -228,7 +229,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, } offset += NeedleHeaderSize + rest glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil { + if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { if err == io.EOF { return nil } |
