diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/needle/crc.go | 2 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_write.go | 34 | ||||
| -rw-r--r-- | weed/storage/needle_map/memdb.go | 9 | ||||
| -rw-r--r-- | weed/storage/store.go | 3 | ||||
| -rw-r--r-- | weed/storage/volume.go | 16 | ||||
| -rw-r--r-- | weed/storage/volume_read.go | 131 | ||||
| -rw-r--r-- | weed/storage/volume_stream_write.go | 2 | ||||
| -rw-r--r-- | weed/storage/volume_write.go (renamed from weed/storage/volume_read_write.go) | 125 |
8 files changed, 208 insertions, 114 deletions
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go index 22456faa2..4476631c2 100644 --- a/weed/storage/needle/crc.go +++ b/weed/storage/needle/crc.go @@ -35,7 +35,7 @@ func NewCRCwriter(w io.Writer) *CRCwriter { return &CRCwriter{ crc: CRC(0), - w: w, + w: w, } } diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index e51df955e..16c2fd06b 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -3,13 +3,12 @@ package needle import ( "errors" "fmt" - "io" - "math" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "io" + "math" ) const ( @@ -156,6 +155,35 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return offset, size, actualSize, err } +func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) { + + if end, _, e := w.GetStat(); e == nil { + defer func(w backend.BackendStorageFile, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) + } + } + }(w, end) + offset = uint64(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + + if version == Version3 { + tsOffset := NeedleHeaderSize + size + NeedleChecksumSize + util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs) + } + + if err == nil { + _, err = w.WriteAt(dataSlice, int64(offset)) + } + + return + +} + func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) { dataSize := GetActualSize(size, version) diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index b25b5e89a..ba1fd3d1e 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -2,6 +2,7 @@ package needle_map import ( "fmt" + "io" "os" "github.com/syndtr/goleveldb/leveldb" @@ -104,7 +105,13 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { } defer idxFile.Close() - return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error { + return cm.LoadFromReaderAt(idxFile) + +} + +func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) { + + return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error { if offset.IsZero() || size.IsDeleted() { return cm.Delete(key) } diff --git a/weed/storage/store.go b/weed/storage/store.go index fb33a708c..c3507c0e2 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -217,6 +217,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() + if volumeMessage == nil { + continue + } if maxFileKey < curMaxFileKey { maxFileKey = curMaxFileKey } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 366449c53..e0638d8a8 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -234,10 +234,16 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { return false } -func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64) { +func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() - glog.V(3).Infof("CollectStatus volume %d", v.Id) + glog.V(3).Infof("collectStatus volume %d", v.Id) + + if v.nm == nil { + return + } + + ok = true maxFileKey = v.nm.MaxFileKey() datFileSize, modTime, _ = v.DataBackend.GetStat() @@ -251,7 +257,11 @@ func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) { - maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize := v.CollectStatus() + maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus() + + if !ok { + return 0, nil + } volumeInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go new file mode 100644 index 000000000..f689eeec0 --- /dev/null +++ b/weed/storage/volume_read.go @@ -0,0 +1,131 @@ +package storage + +import ( + "fmt" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return -1, ErrorNotFound + } + readSize := nv.Size + if readSize.IsDeleted() { + if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { + glog.V(3).Infof("reading deleted %s", n.String()) + readSize = -readSize + } else { + return -1, ErrorDeleted + } + } + if readSize == 0 { + return 0, nil + } + err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + v.checkReadWriteError(err) + if err != nil { + return 0, err + } + bytesRead := len(n.Data) + if !n.HasTtl() { + return bytesRead, nil + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { + return bytesRead, nil + } + return -1, ErrorNotFound +} + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version()) +} + +type VolumeFileScanner interface { + VisitSuperBlock(super_block.SuperBlock) error + ReadNeedleBody() bool + VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error +} + +func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, + needleMapKind NeedleMapKind, + volumeFileScanner VolumeFileScanner) (err error) { + var v *Volume + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { + return fmt.Errorf("failed to load volume %d: %v", id, err) + } + if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { + return fmt.Errorf("failed to process volume %d super block: %v", id, err) + } + defer v.Close() + + version := v.Version() + + offset := int64(v.SuperBlock.BlockSize()) + + return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) +} + +func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { + n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) + if e != nil { + if e == io.EOF { + return nil + } + return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) + } + for n != nil { + var needleBody []byte + if volumeFileScanner.ReadNeedleBody() { + // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest) + if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { + glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err) + // err = fmt.Errorf("cannot read needle body: %v", err) + // return + } + } + err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) + if err == io.EOF { + return nil + } + if err != nil { + glog.V(0).Infof("visit needle error: %v", err) + return fmt.Errorf("visit needle error: %v", err) + } + offset += NeedleHeaderSize + rest + glog.V(4).Infof("==> new entry offset %d", offset) + if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) + } + glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + } + return nil +} diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go index 955875aa2..d229bdf20 100644 --- a/weed/storage/volume_stream_write.go +++ b/weed/storage/volume_stream_write.go @@ -73,7 +73,7 @@ func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { sr := &StreamReader{ readerAt: v.DataBackend, - offset: nv.Offset.ToActualOffset(), + offset: nv.Offset.ToActualOffset(), } bufReader := bufio.NewReader(sr) bufReader.Discard(NeedleHeaderSize) diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_write.go index 1853e458a..a286c5dd5 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_write.go @@ -4,14 +4,12 @@ import ( "bytes" "errors" "fmt" - "io" "os" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -146,7 +144,8 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU return } if existingNeedle.Cookie != n.Cookie { - glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) + glog.V(0).Infof("write cookie mismatch: existing %s, new %s", + needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n)) err = fmt.Errorf("mismatching cookie %x", n.Cookie) return } @@ -226,52 +225,6 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) { return 0, nil } -// read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) { - v.dataFileAccessLock.RLock() - defer v.dataFileAccessLock.RUnlock() - - nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset.IsZero() { - return -1, ErrorNotFound - } - readSize := nv.Size - if readSize.IsDeleted() { - if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { - glog.V(3).Infof("reading deleted %s", n.String()) - readSize = -readSize - } else { - return -1, ErrorDeleted - } - } - if readSize == 0 { - return 0, nil - } - err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) - if err == needle.ErrorSizeMismatch && OffsetSize == 4 { - err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) - } - v.checkReadWriteError(err) - if err != nil { - return 0, err - } - bytesRead := len(n.Data) - if !n.HasTtl() { - return bytesRead, nil - } - ttlMinutes := n.Ttl.Minutes() - if ttlMinutes == 0 { - return bytesRead, nil - } - if !n.HasLastModifiedDate() { - return bytesRead, nil - } - if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { - return bytesRead, nil - } - return -1, ErrorNotFound -} - func (v *Volume) startWorker() { go func() { chanClosed := false @@ -347,66 +300,28 @@ func (v *Volume) startWorker() { }() } -type VolumeFileScanner interface { - VisitSuperBlock(super_block.SuperBlock) error - ReadNeedleBody() bool - VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error -} - -func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, - needleMapKind NeedleMapKind, - volumeFileScanner VolumeFileScanner) (err error) { - var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { - return fmt.Errorf("failed to load volume %d: %v", id, err) - } - if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { - return fmt.Errorf("failed to process volume %d super block: %v", id, err) - } - defer v.Close() +func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error { - version := v.Version() + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() - offset := int64(v.SuperBlock.BlockSize()) + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) { + return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) + } - return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) -} + appendAtNs := uint64(time.Now().UnixNano()) + offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version()) -func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) - if e != nil { - if e == io.EOF { - return nil - } - return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) + v.checkReadWriteError(err) + if err != nil { + return err } - for n != nil { - var needleBody []byte - if volumeFileScanner.ReadNeedleBody() { - // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest) - if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { - glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err) - // err = fmt.Errorf("cannot read needle body: %v", err) - // return - } - } - err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) - if err == io.EOF { - return nil - } - if err != nil { - glog.V(0).Infof("visit needle error: %v", err) - return fmt.Errorf("visit needle error: %v", err) - } - offset += NeedleHeaderSize + rest - glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) - } - glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + v.lastAppendAtNs = appendAtNs + + // add to needle map + if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil { + glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err) } - return nil + + return err } |
