aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/backend/backend.go6
-rw-r--r--weed/storage/backend/disk_file.go2
-rw-r--r--weed/storage/backend/s3_backend/s3_sessions.go5
-rw-r--r--weed/storage/disk_location.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume.go2
-rw-r--r--weed/storage/needle/crc.go23
-rw-r--r--weed/storage/needle/needle_read_write.go36
-rw-r--r--weed/storage/needle_map/memdb.go9
-rw-r--r--weed/storage/needle_map_leveldb.go6
-rw-r--r--weed/storage/needle_map_sorted_file.go8
-rw-r--r--weed/storage/store.go24
-rw-r--r--weed/storage/store_ec.go3
-rw-r--r--weed/storage/super_block/replica_placement.go3
-rw-r--r--weed/storage/volume.go16
-rw-r--r--weed/storage/volume_loading.go10
-rw-r--r--weed/storage/volume_read.go131
-rw-r--r--weed/storage/volume_stream_write.go104
-rw-r--r--weed/storage/volume_tier.go13
-rw-r--r--weed/storage/volume_vacuum.go2
-rw-r--r--weed/storage/volume_write.go (renamed from weed/storage/volume_read_write.go)185
20 files changed, 400 insertions, 190 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index b8b883be6..2dc61d02e 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -58,6 +58,9 @@ func LoadConfiguration(config *util.ViperProxy) {
if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") {
continue
}
+ if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found {
+ continue
+ }
backendStorage, buildErr := backendStorageFactory.BuildStorage(config,
StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId)
if buildErr != nil {
@@ -81,6 +84,9 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) {
glog.Warningf("storage type %s not found", storageBackend.Type)
continue
}
+ if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found {
+ continue
+ }
backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id)
if buildErr != nil {
glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id)
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 498963c31..3b42429cf 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -52,7 +52,7 @@ func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
return
}
-func (df *DiskFile) Append(p []byte) (n int, err error) {
+func (df *DiskFile) Write(p []byte) (n int, err error) {
return df.WriteAt(p, df.fileSize)
}
diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go
index e2fdf1eb6..b8378c379 100644
--- a/weed/storage/backend/s3_backend/s3_sessions.go
+++ b/weed/storage/backend/s3_backend/s3_sessions.go
@@ -34,8 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string)
}
config := &aws.Config{
- Region: aws.String(region),
- Endpoint: aws.String(endpoint),
+ Region: aws.String(region),
+ Endpoint: aws.String(endpoint),
+ S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 6de87c793..ed4e00312 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -131,7 +131,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
l.SetVolume(vid, v)
size, _, _ := v.FileStat()
- glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
+ glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
return true
}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 85d6a5fc8..171db92a4 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -63,7 +63,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
// read volume info
ev.Version = needle.Version3
- if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
+ if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go
index 6fd910bb7..4476631c2 100644
--- a/weed/storage/needle/crc.go
+++ b/weed/storage/needle/crc.go
@@ -2,6 +2,7 @@ package needle
import (
"fmt"
+ "io"
"github.com/klauspost/crc32"
@@ -29,3 +30,25 @@ func (n *Needle) Etag() string {
util.Uint32toBytes(bits, uint32(n.Checksum))
return fmt.Sprintf("%x", bits)
}
+
+func NewCRCwriter(w io.Writer) *CRCwriter {
+
+ return &CRCwriter{
+ crc: CRC(0),
+ w: w,
+ }
+
+}
+
+type CRCwriter struct {
+ crc CRC
+ w io.Writer
+}
+
+func (c *CRCwriter) Write(p []byte) (n int, err error) {
+ n, err = c.w.Write(p) // with each write ...
+ c.crc = c.crc.Update(p)
+ return
+}
+
+func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 0f72bc0bb..16c2fd06b 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -3,13 +3,12 @@ package needle
import (
"errors"
"fmt"
- "io"
- "math"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math"
)
const (
@@ -156,6 +155,35 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return offset, size, actualSize, err
}
+func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) {
+
+ if end, _, e := w.GetStat(); e == nil {
+ defer func(w backend.BackendStorageFile, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
+ }
+ }
+ }(w, end)
+ offset = uint64(end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
+ }
+
+ if version == Version3 {
+ tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
+ util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs)
+ }
+
+ if err == nil {
+ _, err = w.WriteAt(dataSlice, int64(offset))
+ }
+
+ return
+
+}
+
func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
@@ -168,7 +196,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
}
if err != nil {
fileSize, _, _ := r.GetStat()
- println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
+ println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
}
return dataSlice, err
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index b25b5e89a..ba1fd3d1e 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -2,6 +2,7 @@ package needle_map
import (
"fmt"
+ "io"
"os"
"github.com/syndtr/goleveldb/leveldb"
@@ -104,7 +105,13 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
}
defer idxFile.Close()
- return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error {
+ return cm.LoadFromReaderAt(idxFile)
+
+}
+
+func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) {
+
+ return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error {
if offset.IsZero() || size.IsDeleted() {
return cm.Delete(key)
}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 9716e9729..31c86d124 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -152,8 +152,10 @@ func (m *LevelDbNeedleMap) Close() {
glog.Warningf("close index file %s failed: %v", indexFileName, err)
}
- if err := m.db.Close(); err != nil {
- glog.Warningf("close levelDB failed: %v", err)
+ if m.db != nil {
+ if err := m.db.Close(); err != nil {
+ glog.Warningf("close levelDB failed: %v", err)
+ }
}
}
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index 3449ff9dc..662b90531 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
}
func (m *SortedFileNeedleMap) Close() {
- m.indexFile.Close()
- m.dbFile.Close()
+ if m.indexFile != nil {
+ m.indexFile.Close()
+ }
+ if m.dbFile != nil {
+ m.dbFile.Close()
+ }
}
func (m *SortedFileNeedleMap) Destroy() error {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 47829666a..6be15a4c9 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -106,6 +106,9 @@ func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
if diskType != location.DiskType {
continue
}
+ if location.isDiskSpaceLow {
+ continue
+ }
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
currentFreeCount *= erasure_coding.DataShardsCount
currentFreeCount -= location.EcVolumesLen()
@@ -217,23 +220,36 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
location.volumesLock.RLock()
for _, v := range location.volumes {
curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
+ if volumeMessage == nil {
+ continue
+ }
if maxFileKey < curMaxFileKey {
maxFileKey = curMaxFileKey
}
+ deleteVolume := false
if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, volumeMessage)
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
+ deleteVolume = true
} else {
glog.V(0).Infof("volume %d is expired", v.Id)
}
if v.lastIoError != nil {
deleteVids = append(deleteVids, v.Id)
+ deleteVolume = true
glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
}
}
- collectionVolumeSize[v.Collection] += volumeMessage.Size
+
+ if _, exist := collectionVolumeSize[v.Collection]; !exist {
+ collectionVolumeSize[v.Collection] = 0
+ }
+ if !deleteVolume {
+ collectionVolumeSize[v.Collection] += volumeMessage.Size
+ }
+
if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
"IsReadOnly": 0,
@@ -242,7 +258,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
"isDiskSpaceLow": 0,
}
}
- if v.IsReadOnly() {
+ if !deleteVolume && v.IsReadOnly() {
collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
if v.noWriteOrDelete {
collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
@@ -267,7 +283,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
glog.V(0).Infof("volume %d is deleted", vid)
}
} else {
- glog.V(0).Infof("delete volume %d: %v", vid, err)
+ glog.Warningf("delete volume %d: %v", vid, err)
}
}
location.volumesLock.Unlock()
@@ -446,7 +462,7 @@ func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
// load, modify, save
baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
vifFile := filepath.Join(location.Directory, baseFileName+".vif")
- volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
+ volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile)
if err != nil {
return fmt.Errorf("volume %d fail to load vif", i)
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index a9b6a8ff3..9702fdd50 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -200,7 +200,6 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur
return
}
glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
- forgetShardId(ecVolume, shardId)
}
// try reading by recovering from other shards
@@ -303,7 +302,7 @@ func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId type
break
}
if receiveErr != nil {
- return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
+ return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr)
}
if resp.IsDeleted {
is_deleted = true
diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go
index 65ec53819..a263e6669 100644
--- a/weed/storage/super_block/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
}
func (rp *ReplicaPlacement) Byte() byte {
+ if rp == nil {
+ return 0
+ }
ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
return byte(ret)
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 366449c53..e0638d8a8 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -234,10 +234,16 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
return false
}
-func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64) {
+func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
- glog.V(3).Infof("CollectStatus volume %d", v.Id)
+ glog.V(3).Infof("collectStatus volume %d", v.Id)
+
+ if v.nm == nil {
+ return
+ }
+
+ ok = true
maxFileKey = v.nm.MaxFileKey()
datFileSize, modTime, _ = v.DataBackend.GetStat()
@@ -251,7 +257,11 @@ func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64,
func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) {
- maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize := v.CollectStatus()
+ maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus()
+
+ if !ok {
+ return 0, nil
+ }
volumeInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index bff1055bb..0cf603ad8 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -39,12 +39,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
}()
- hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
+ hasVolumeInfoFile := v.maybeLoadVolumeInfo()
if v.HasRemoteFile() {
v.noWriteCanDelete = true
v.noWriteOrDelete = false
- glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
+ glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
v.LoadRemoteFile()
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
@@ -83,6 +83,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if alreadyHasSuperBlock {
err = v.readSuperBlock()
+ glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
+ if v.HasRemoteFile() {
+ // maybe temporary network problem
+ glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
+ err = nil
+ }
} else {
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
new file mode 100644
index 000000000..f689eeec0
--- /dev/null
+++ b/weed/storage/volume_read.go
@@ -0,0 +1,131 @@
+package storage
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return -1, ErrorNotFound
+ }
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ glog.V(3).Infof("reading deleted %s", n.String())
+ readSize = -readSize
+ } else {
+ return -1, ErrorDeleted
+ }
+ }
+ if readSize == 0 {
+ return 0, nil
+ }
+ err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ v.checkReadWriteError(err)
+ if err != nil {
+ return 0, err
+ }
+ bytesRead := len(n.Data)
+ if !n.HasTtl() {
+ return bytesRead, nil
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
+ return bytesRead, nil
+ }
+ return -1, ErrorNotFound
+}
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version())
+}
+
+type VolumeFileScanner interface {
+ VisitSuperBlock(super_block.SuperBlock) error
+ ReadNeedleBody() bool
+ VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
+}
+
+func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
+ needleMapKind NeedleMapKind,
+ volumeFileScanner VolumeFileScanner) (err error) {
+ var v *Volume
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
+ return fmt.Errorf("failed to load volume %d: %v", id, err)
+ }
+ if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("failed to process volume %d super block: %v", id, err)
+ }
+ defer v.Close()
+
+ version := v.Version()
+
+ offset := int64(v.SuperBlock.BlockSize())
+
+ return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
+}
+
+func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
+ n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
+ if e != nil {
+ if e == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
+ }
+ for n != nil {
+ var needleBody []byte
+ if volumeFileScanner.ReadNeedleBody() {
+ // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
+ if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
+ glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
+ // err = fmt.Errorf("cannot read needle body: %v", err)
+ // return
+ }
+ }
+ err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ glog.V(0).Infof("visit needle error: %v", err)
+ return fmt.Errorf("visit needle error: %v", err)
+ }
+ offset += NeedleHeaderSize + rest
+ glog.V(4).Infof("==> new entry offset %d", offset)
+ if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
+ }
+ glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ }
+ return nil
+}
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
new file mode 100644
index 000000000..d229bdf20
--- /dev/null
+++ b/weed/storage/volume_stream_write.go
@@ -0,0 +1,104 @@
+package storage
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ df, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("unexpected volume backend")
+ }
+ offset, _, _ := v.DataBackend.GetStat()
+
+ header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
+ CookieToBytes(header[0:CookieSize], n.Cookie)
+ NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
+ n.Size = 4 + Size(dataSize) + 1
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+
+ n.DataSize = dataSize
+
+ // needle header
+ df.Write(header[0:NeedleHeaderSize])
+
+ // data size and data
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ df.Write(header[0:4])
+ // write and calculate CRC
+ crcWriter := needle.NewCRCwriter(df)
+ io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
+
+ // flags
+ util.Uint8toBytes(header[0:1], n.Flags)
+ df.Write(header[0:1])
+
+ // data checksum
+ util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
+ // write timestamp, padding
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
+ padding := needle.PaddingLength(n.Size, needle.Version3)
+ df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
+
+ // add to needle map
+ if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ return
+}
+
+func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+
+ sr := &StreamReader{
+ readerAt: v.DataBackend,
+ offset: nv.Offset.ToActualOffset(),
+ }
+ bufReader := bufio.NewReader(sr)
+ bufReader.Discard(NeedleHeaderSize)
+ sizeBuf := make([]byte, 4)
+ bufReader.Read(sizeBuf)
+ if _, err = writer.Write(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
+
+ return
+}
+
+type StreamReader struct {
+ offset int64
+ readerAt io.ReaderAt
+}
+
+func (sr *StreamReader) Read(p []byte) (n int, err error) {
+ n, err = sr.readerAt.ReadAt(p, sr.offset)
+ if err != nil {
+ return
+ }
+ sr.offset += int64(n)
+ return
+}
diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go
index 77efd8a14..23160906b 100644
--- a/weed/storage/volume_tier.go
+++ b/weed/storage/volume_tier.go
@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
_ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
@@ -14,13 +15,23 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
func (v *Volume) maybeLoadVolumeInfo() (found bool) {
- v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
+ var err error
+ v.volumeInfo, v.hasRemoteFile, found, err = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
+
+ if v.volumeInfo.Version == 0 {
+ v.volumeInfo.Version = uint32(needle.CurrentVersion)
+ }
if v.hasRemoteFile {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key)
}
+ if err != nil {
+ glog.Warningf("load volume %d.vif file: %v", v.Id, err)
+ return
+ }
+
return
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 0ee1e61c6..be84f8a13 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if err != nil {
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
}
- dstDatBackend.Append(needleBytes)
+ dstDatBackend.Write(needleBytes)
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_write.go
index 07376bc88..a286c5dd5 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_write.go
@@ -4,14 +4,12 @@ import (
"bytes"
"errors"
"fmt"
- "io"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -104,47 +102,8 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
return
}
- if v.isFileUnchanged(n) {
- size = Size(n.DataSize)
- isUnchanged = true
- return
- }
-
- // check whether existing needle cookie matches
- nv, ok := v.nm.Get(n.Id)
- if ok {
- existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
- if existingNeedleReadErr != nil {
- err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
- return
- }
- if existingNeedle.Cookie != n.Cookie {
- glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
- err = fmt.Errorf("mismatching cookie %x", n.Cookie)
- return
- }
- }
-
- // append to dat file
- n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, size, _, err = n.Append(v.DataBackend, v.Version())
- v.checkReadWriteError(err)
- if err != nil {
- return
- }
- v.lastAppendAtNs = n.AppendAtNs
-
- // add to needle map
- if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
- if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
- glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
- }
- }
- if v.lastModifiedTsSeconds < n.LastModified {
- v.lastModifiedTsSeconds = n.LastModified
- }
- return
+ return v.doWriteRequest(n)
}
func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
@@ -185,7 +144,8 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU
return
}
if existingNeedle.Cookie != n.Cookie {
- glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
+ glog.V(0).Infof("write cookie mismatch: existing %s, new %s",
+ needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n))
err = fmt.Errorf("mismatching cookie %x", n.Cookie)
return
}
@@ -223,24 +183,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
return 0, err
}
- nv, ok := v.nm.Get(n.Id)
- // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok && nv.Size.IsValid() {
- size := nv.Size
- n.Data = nil
- n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, _, _, err := n.Append(v.DataBackend, v.Version())
- v.checkReadWriteError(err)
- if err != nil {
- return size, err
- }
- v.lastAppendAtNs = n.AppendAtNs
- if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
- return size, err
- }
- return size, err
- }
- return 0, nil
+ return v.doDeleteRequest(n)
}
func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
@@ -282,52 +225,6 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
return 0, nil
}
-// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
- v.dataFileAccessLock.RLock()
- defer v.dataFileAccessLock.RUnlock()
-
- nv, ok := v.nm.Get(n.Id)
- if !ok || nv.Offset.IsZero() {
- return -1, ErrorNotFound
- }
- readSize := nv.Size
- if readSize.IsDeleted() {
- if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
- glog.V(3).Infof("reading deleted %s", n.String())
- readSize = -readSize
- } else {
- return -1, ErrorDeleted
- }
- }
- if readSize == 0 {
- return 0, nil
- }
- err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
- if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
- err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
- }
- v.checkReadWriteError(err)
- if err != nil {
- return 0, err
- }
- bytesRead := len(n.Data)
- if !n.HasTtl() {
- return bytesRead, nil
- }
- ttlMinutes := n.Ttl.Minutes()
- if ttlMinutes == 0 {
- return bytesRead, nil
- }
- if !n.HasLastModifiedDate() {
- return bytesRead, nil
- }
- if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
- return bytesRead, nil
- }
- return -1, ErrorNotFound
-}
-
func (v *Volume) startWorker() {
go func() {
chanClosed := false
@@ -403,66 +300,28 @@ func (v *Volume) startWorker() {
}()
}
-type VolumeFileScanner interface {
- VisitSuperBlock(super_block.SuperBlock) error
- ReadNeedleBody() bool
- VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
-}
-
-func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
- needleMapKind NeedleMapKind,
- volumeFileScanner VolumeFileScanner) (err error) {
- var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
- return fmt.Errorf("failed to load volume %d: %v", id, err)
- }
- if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("failed to process volume %d super block: %v", id, err)
- }
- defer v.Close()
+func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error {
- version := v.Version()
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
- offset := int64(v.SuperBlock.BlockSize())
+ if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) {
+ return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
+ }
- return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
-}
+ appendAtNs := uint64(time.Now().UnixNano())
+ offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
-func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
- n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
- if e != nil {
- if e == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
+ v.checkReadWriteError(err)
+ if err != nil {
+ return err
}
- for n != nil {
- var needleBody []byte
- if volumeFileScanner.ReadNeedleBody() {
- // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
- if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
- glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
- // err = fmt.Errorf("cannot read needle body: %v", err)
- // return
- }
- }
- err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- glog.V(0).Infof("visit needle error: %v", err)
- return fmt.Errorf("visit needle error: %v", err)
- }
- offset += NeedleHeaderSize + rest
- glog.V(4).Infof("==> new entry offset %d", offset)
- if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
- }
- glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ v.lastAppendAtNs = appendAtNs
+
+ // add to needle map
+ if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil {
+ glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err)
}
- return nil
+
+ return err
}