aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/needle/crc.go2
-rw-r--r--weed/storage/needle/needle_read_write.go34
-rw-r--r--weed/storage/needle_map/memdb.go9
-rw-r--r--weed/storage/store.go3
-rw-r--r--weed/storage/volume.go16
-rw-r--r--weed/storage/volume_read.go131
-rw-r--r--weed/storage/volume_stream_write.go2
-rw-r--r--weed/storage/volume_write.go (renamed from weed/storage/volume_read_write.go)125
8 files changed, 208 insertions, 114 deletions
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go
index 22456faa2..4476631c2 100644
--- a/weed/storage/needle/crc.go
+++ b/weed/storage/needle/crc.go
@@ -35,7 +35,7 @@ func NewCRCwriter(w io.Writer) *CRCwriter {
return &CRCwriter{
crc: CRC(0),
- w: w,
+ w: w,
}
}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index e51df955e..16c2fd06b 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -3,13 +3,12 @@ package needle
import (
"errors"
"fmt"
- "io"
- "math"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math"
)
const (
@@ -156,6 +155,35 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return offset, size, actualSize, err
}
+func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) {
+
+ if end, _, e := w.GetStat(); e == nil {
+ defer func(w backend.BackendStorageFile, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
+ }
+ }
+ }(w, end)
+ offset = uint64(end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
+ }
+
+ if version == Version3 {
+ tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
+ util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs)
+ }
+
+ if err == nil {
+ _, err = w.WriteAt(dataSlice, int64(offset))
+ }
+
+ return
+
+}
+
func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index b25b5e89a..ba1fd3d1e 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -2,6 +2,7 @@ package needle_map
import (
"fmt"
+ "io"
"os"
"github.com/syndtr/goleveldb/leveldb"
@@ -104,7 +105,13 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
}
defer idxFile.Close()
- return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error {
+ return cm.LoadFromReaderAt(idxFile)
+
+}
+
+func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) {
+
+ return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error {
if offset.IsZero() || size.IsDeleted() {
return cm.Delete(key)
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index fb33a708c..c3507c0e2 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -217,6 +217,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
+ if volumeMessage == nil {
+ continue
+ }
if maxFileKey < curMaxFileKey {
maxFileKey = curMaxFileKey
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 366449c53..e0638d8a8 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -234,10 +234,16 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
return false
}
-func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64) {
+func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
- glog.V(3).Infof("CollectStatus volume %d", v.Id)
+ glog.V(3).Infof("collectStatus volume %d", v.Id)
+
+ if v.nm == nil {
+ return
+ }
+
+ ok = true
maxFileKey = v.nm.MaxFileKey()
datFileSize, modTime, _ = v.DataBackend.GetStat()
@@ -251,7 +257,11 @@ func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64,
func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) {
- maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize := v.CollectStatus()
+ maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus()
+
+ if !ok {
+ return 0, nil
+ }
volumeInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
new file mode 100644
index 000000000..f689eeec0
--- /dev/null
+++ b/weed/storage/volume_read.go
@@ -0,0 +1,131 @@
+package storage
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return -1, ErrorNotFound
+ }
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ glog.V(3).Infof("reading deleted %s", n.String())
+ readSize = -readSize
+ } else {
+ return -1, ErrorDeleted
+ }
+ }
+ if readSize == 0 {
+ return 0, nil
+ }
+ err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ v.checkReadWriteError(err)
+ if err != nil {
+ return 0, err
+ }
+ bytesRead := len(n.Data)
+ if !n.HasTtl() {
+ return bytesRead, nil
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
+ return bytesRead, nil
+ }
+ return -1, ErrorNotFound
+}
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version())
+}
+
+type VolumeFileScanner interface {
+ VisitSuperBlock(super_block.SuperBlock) error
+ ReadNeedleBody() bool
+ VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
+}
+
+func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
+ needleMapKind NeedleMapKind,
+ volumeFileScanner VolumeFileScanner) (err error) {
+ var v *Volume
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
+ return fmt.Errorf("failed to load volume %d: %v", id, err)
+ }
+ if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("failed to process volume %d super block: %v", id, err)
+ }
+ defer v.Close()
+
+ version := v.Version()
+
+ offset := int64(v.SuperBlock.BlockSize())
+
+ return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
+}
+
+func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
+ n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
+ if e != nil {
+ if e == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
+ }
+ for n != nil {
+ var needleBody []byte
+ if volumeFileScanner.ReadNeedleBody() {
+ // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
+ if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
+ glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
+ // err = fmt.Errorf("cannot read needle body: %v", err)
+ // return
+ }
+ }
+ err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ glog.V(0).Infof("visit needle error: %v", err)
+ return fmt.Errorf("visit needle error: %v", err)
+ }
+ offset += NeedleHeaderSize + rest
+ glog.V(4).Infof("==> new entry offset %d", offset)
+ if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
+ }
+ glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ }
+ return nil
+}
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
index 955875aa2..d229bdf20 100644
--- a/weed/storage/volume_stream_write.go
+++ b/weed/storage/volume_stream_write.go
@@ -73,7 +73,7 @@ func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
sr := &StreamReader{
readerAt: v.DataBackend,
- offset: nv.Offset.ToActualOffset(),
+ offset: nv.Offset.ToActualOffset(),
}
bufReader := bufio.NewReader(sr)
bufReader.Discard(NeedleHeaderSize)
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_write.go
index 1853e458a..a286c5dd5 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_write.go
@@ -4,14 +4,12 @@ import (
"bytes"
"errors"
"fmt"
- "io"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -146,7 +144,8 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
return
}
if existingNeedle.Cookie != n.Cookie {
- glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
+ glog.V(0).Infof("write cookie mismatch: existing %s, new %s",
+ needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n))
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
return
}
@@ -226,52 +225,6 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
return 0, nil
}
-// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
- v.dataFileAccessLock.RLock()
- defer v.dataFileAccessLock.RUnlock()
-
- nv, ok := v.nm.Get(n.Id)
- if !ok || nv.Offset.IsZero() {
- return -1, ErrorNotFound
- }
- readSize := nv.Size
- if readSize.IsDeleted() {
- if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
- glog.V(3).Infof("reading deleted %s", n.String())
- readSize = -readSize
- } else {
- return -1, ErrorDeleted
- }
- }
- if readSize == 0 {
- return 0, nil
- }
- err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
- if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
- err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
- }
- v.checkReadWriteError(err)
- if err != nil {
- return 0, err
- }
- bytesRead := len(n.Data)
- if !n.HasTtl() {
- return bytesRead, nil
- }
- ttlMinutes := n.Ttl.Minutes()
- if ttlMinutes == 0 {
- return bytesRead, nil
- }
- if !n.HasLastModifiedDate() {
- return bytesRead, nil
- }
- if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
- return bytesRead, nil
- }
- return -1, ErrorNotFound
-}
-
func (v *Volume) startWorker() {
go func() {
chanClosed := false
@@ -347,66 +300,28 @@ func (v *Volume) startWorker() {
}()
}
-type VolumeFileScanner interface {
- VisitSuperBlock(super_block.SuperBlock) error
- ReadNeedleBody() bool
- VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
-}
-
-func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
- needleMapKind NeedleMapKind,
- volumeFileScanner VolumeFileScanner) (err error) {
- var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
- return fmt.Errorf("failed to load volume %d: %v", id, err)
- }
- if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("failed to process volume %d super block: %v", id, err)
- }
- defer v.Close()
+func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error {
- version := v.Version()
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
- offset := int64(v.SuperBlock.BlockSize())
+ if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) {
+ return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
+ }
- return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
-}
+ appendAtNs := uint64(time.Now().UnixNano())
+ offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
-func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
- n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
- if e != nil {
- if e == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
+ v.checkReadWriteError(err)
+ if err != nil {
+ return err
}
- for n != nil {
- var needleBody []byte
- if volumeFileScanner.ReadNeedleBody() {
- // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
- if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
- glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
- // err = fmt.Errorf("cannot read needle body: %v", err)
- // return
- }
- }
- err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- glog.V(0).Infof("visit needle error: %v", err)
- return fmt.Errorf("visit needle error: %v", err)
- }
- offset += NeedleHeaderSize + rest
- glog.V(4).Infof("==> new entry offset %d", offset)
- if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
- }
- glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ v.lastAppendAtNs = appendAtNs
+
+ // add to needle map
+ if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil {
+ glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err)
}
- return nil
+
+ return err
}