diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/backend/backend.go | 6 | ||||
| -rw-r--r-- | weed/storage/backend/disk_file.go | 2 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_sessions.go | 5 | ||||
| -rw-r--r-- | weed/storage/disk_location.go | 2 | ||||
| -rw-r--r-- | weed/storage/erasure_coding/ec_volume.go | 2 | ||||
| -rw-r--r-- | weed/storage/needle/crc.go | 23 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_write.go | 36 | ||||
| -rw-r--r-- | weed/storage/needle_map/memdb.go | 9 | ||||
| -rw-r--r-- | weed/storage/needle_map_leveldb.go | 6 | ||||
| -rw-r--r-- | weed/storage/needle_map_sorted_file.go | 8 | ||||
| -rw-r--r-- | weed/storage/store.go | 24 | ||||
| -rw-r--r-- | weed/storage/store_ec.go | 3 | ||||
| -rw-r--r-- | weed/storage/super_block/replica_placement.go | 3 | ||||
| -rw-r--r-- | weed/storage/volume.go | 16 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 10 | ||||
| -rw-r--r-- | weed/storage/volume_read.go | 131 | ||||
| -rw-r--r-- | weed/storage/volume_stream_write.go | 104 | ||||
| -rw-r--r-- | weed/storage/volume_tier.go | 13 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 2 | ||||
| -rw-r--r-- | weed/storage/volume_write.go (renamed from weed/storage/volume_read_write.go) | 185 |
20 files changed, 400 insertions, 190 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index b8b883be6..2dc61d02e 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -58,6 +58,9 @@ func LoadConfiguration(config *util.ViperProxy) { if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") { continue } + if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found { + continue + } backendStorage, buildErr := backendStorageFactory.BuildStorage(config, StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId) if buildErr != nil { @@ -81,6 +84,9 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) { glog.Warningf("storage type %s not found", storageBackend.Type) continue } + if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found { + continue + } backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id) if buildErr != nil { glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id) diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 498963c31..3b42429cf 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -52,7 +52,7 @@ func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { return } -func (df *DiskFile) Append(p []byte) (n int, err error) { +func (df *DiskFile) Write(p []byte) (n int, err error) { return df.WriteAt(p, df.fileSize) } diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go index e2fdf1eb6..b8378c379 100644 --- a/weed/storage/backend/s3_backend/s3_sessions.go +++ b/weed/storage/backend/s3_backend/s3_sessions.go @@ -34,8 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string) } config := &aws.Config{ - Region: aws.String(region), - Endpoint: aws.String(endpoint), + Region: aws.String(region), + Endpoint: aws.String(endpoint), + S3ForcePathStyle: aws.Bool(true), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 6de87c793..ed4e00312 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -131,7 +131,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne l.SetVolume(vid, v) size, _, _ := v.FileStat() - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", + glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s", l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) return true } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 85d6a5fc8..171db92a4 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -63,7 +63,7 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection // read volume info ev.Version = needle.Version3 - if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { + if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { ev.Version = needle.Version(volumeInfo.Version) } else { pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go index 6fd910bb7..4476631c2 100644 --- a/weed/storage/needle/crc.go +++ b/weed/storage/needle/crc.go @@ -2,6 +2,7 @@ package needle import ( "fmt" + "io" "github.com/klauspost/crc32" @@ -29,3 +30,25 @@ func (n *Needle) Etag() string { util.Uint32toBytes(bits, uint32(n.Checksum)) return fmt.Sprintf("%x", bits) } + +func NewCRCwriter(w io.Writer) *CRCwriter { + + return &CRCwriter{ + crc: CRC(0), + w: w, + } + +} + +type CRCwriter struct { + crc CRC + w io.Writer +} + +func (c *CRCwriter) Write(p []byte) (n int, err error) { + n, err = c.w.Write(p) // with each write ... + c.crc = c.crc.Update(p) + return +} + +func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 0f72bc0bb..16c2fd06b 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -3,13 +3,12 @@ package needle import ( "errors" "fmt" - "io" - "math" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "io" + "math" ) const ( @@ -156,6 +155,35 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return offset, size, actualSize, err } +func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) { + + if end, _, e := w.GetStat(); e == nil { + defer func(w backend.BackendStorageFile, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) + } + } + }(w, end) + offset = uint64(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + + if version == Version3 { + tsOffset := NeedleHeaderSize + size + NeedleChecksumSize + util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs) + } + + if err == nil { + _, err = w.WriteAt(dataSlice, int64(offset)) + } + + return + +} + func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) { dataSize := GetActualSize(size, version) @@ -168,7 +196,7 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi } if err != nil { fileSize, _, _ := r.GetStat() - println("n",n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) + println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) } return dataSlice, err diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index b25b5e89a..ba1fd3d1e 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -2,6 +2,7 @@ package needle_map import ( "fmt" + "io" "os" "github.com/syndtr/goleveldb/leveldb" @@ -104,7 +105,13 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { } defer idxFile.Close() - return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size Size) error { + return cm.LoadFromReaderAt(idxFile) + +} + +func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) { + + return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error { if offset.IsZero() || size.IsDeleted() { return cm.Delete(key) } diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 9716e9729..31c86d124 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -152,8 +152,10 @@ func (m *LevelDbNeedleMap) Close() { glog.Warningf("close index file %s failed: %v", indexFileName, err) } - if err := m.db.Close(); err != nil { - glog.Warningf("close levelDB failed: %v", err) + if m.db != nil { + if err := m.db.Close(); err != nil { + glog.Warningf("close levelDB failed: %v", err) + } } } diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index 3449ff9dc..662b90531 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error { } func (m *SortedFileNeedleMap) Close() { - m.indexFile.Close() - m.dbFile.Close() + if m.indexFile != nil { + m.indexFile.Close() + } + if m.dbFile != nil { + m.dbFile.Close() + } } func (m *SortedFileNeedleMap) Destroy() error { diff --git a/weed/storage/store.go b/weed/storage/store.go index 47829666a..6be15a4c9 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -106,6 +106,9 @@ func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { if diskType != location.DiskType { continue } + if location.isDiskSpaceLow { + continue + } currentFreeCount := location.MaxVolumeCount - location.VolumesLen() currentFreeCount *= erasure_coding.DataShardsCount currentFreeCount -= location.EcVolumesLen() @@ -217,23 +220,36 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { location.volumesLock.RLock() for _, v := range location.volumes { curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() + if volumeMessage == nil { + continue + } if maxFileKey < curMaxFileKey { maxFileKey = curMaxFileKey } + deleteVolume := false if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) { volumeMessages = append(volumeMessages, volumeMessage) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { deleteVids = append(deleteVids, v.Id) + deleteVolume = true } else { glog.V(0).Infof("volume %d is expired", v.Id) } if v.lastIoError != nil { deleteVids = append(deleteVids, v.Id) + deleteVolume = true glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError) } } - collectionVolumeSize[v.Collection] += volumeMessage.Size + + if _, exist := collectionVolumeSize[v.Collection]; !exist { + collectionVolumeSize[v.Collection] = 0 + } + if !deleteVolume { + collectionVolumeSize[v.Collection] += volumeMessage.Size + } + if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ "IsReadOnly": 0, @@ -242,7 +258,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { "isDiskSpaceLow": 0, } } - if v.IsReadOnly() { + if !deleteVolume && v.IsReadOnly() { collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 if v.noWriteOrDelete { collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 @@ -267,7 +283,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { glog.V(0).Infof("volume %d is deleted", vid) } } else { - glog.V(0).Infof("delete volume %d: %v", vid, err) + glog.Warningf("delete volume %d: %v", vid, err) } } location.volumesLock.Unlock() @@ -446,7 +462,7 @@ func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error { // load, modify, save baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name())) vifFile := filepath.Join(location.Directory, baseFileName+".vif") - volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile) + volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile) if err != nil { return fmt.Errorf("volume %d fail to load vif", i) } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index a9b6a8ff3..9702fdd50 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -200,7 +200,6 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur return } glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err) - forgetShardId(ecVolume, shardId) } // try reading by recovering from other shards @@ -303,7 +302,7 @@ func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId type break } if receiveErr != nil { - return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) + return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr) } if resp.IsDeleted { is_deleted = true diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index 65ec53819..a263e6669 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { } func (rp *ReplicaPlacement) Byte() byte { + if rp == nil { + return 0 + } ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount return byte(ret) } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 366449c53..e0638d8a8 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -234,10 +234,16 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { return false } -func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64) { +func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) { v.dataFileAccessLock.RLock() defer v.dataFileAccessLock.RUnlock() - glog.V(3).Infof("CollectStatus volume %d", v.Id) + glog.V(3).Infof("collectStatus volume %d", v.Id) + + if v.nm == nil { + return + } + + ok = true maxFileKey = v.nm.MaxFileKey() datFileSize, modTime, _ = v.DataBackend.GetStat() @@ -251,7 +257,11 @@ func (v *Volume) CollectStatus() (maxFileKey types.NeedleId, datFileSize int64, func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) { - maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize := v.CollectStatus() + maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus() + + if !ok { + return 0, nil + } volumeInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index bff1055bb..0cf603ad8 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -39,12 +39,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } }() - hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0 + hasVolumeInfoFile := v.maybeLoadVolumeInfo() if v.HasRemoteFile() { v.noWriteCanDelete = true v.noWriteOrDelete = false - glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files) + glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo) v.LoadRemoteFile() alreadyHasSuperBlock = true } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists { @@ -83,6 +83,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if alreadyHasSuperBlock { err = v.readSuperBlock() + glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version) + if v.HasRemoteFile() { + // maybe temporary network problem + glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err) + err = nil + } } else { if !v.SuperBlock.Initialized() { return fmt.Errorf("volume %s not initialized", v.FileName(".dat")) diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go new file mode 100644 index 000000000..f689eeec0 --- /dev/null +++ b/weed/storage/volume_read.go @@ -0,0 +1,131 @@ +package storage + +import ( + "fmt" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return -1, ErrorNotFound + } + readSize := nv.Size + if readSize.IsDeleted() { + if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { + glog.V(3).Infof("reading deleted %s", n.String()) + readSize = -readSize + } else { + return -1, ErrorDeleted + } + } + if readSize == 0 { + return 0, nil + } + err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + v.checkReadWriteError(err) + if err != nil { + return 0, err + } + bytesRead := len(n.Data) + if !n.HasTtl() { + return bytesRead, nil + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { + return bytesRead, nil + } + return -1, ErrorNotFound +} + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version()) +} + +type VolumeFileScanner interface { + VisitSuperBlock(super_block.SuperBlock) error + ReadNeedleBody() bool + VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error +} + +func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, + needleMapKind NeedleMapKind, + volumeFileScanner VolumeFileScanner) (err error) { + var v *Volume + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { + return fmt.Errorf("failed to load volume %d: %v", id, err) + } + if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { + return fmt.Errorf("failed to process volume %d super block: %v", id, err) + } + defer v.Close() + + version := v.Version() + + offset := int64(v.SuperBlock.BlockSize()) + + return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) +} + +func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { + n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) + if e != nil { + if e == io.EOF { + return nil + } + return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) + } + for n != nil { + var needleBody []byte + if volumeFileScanner.ReadNeedleBody() { + // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest) + if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { + glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err) + // err = fmt.Errorf("cannot read needle body: %v", err) + // return + } + } + err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) + if err == io.EOF { + return nil + } + if err != nil { + glog.V(0).Infof("visit needle error: %v", err) + return fmt.Errorf("visit needle error: %v", err) + } + offset += NeedleHeaderSize + rest + glog.V(4).Infof("==> new entry offset %d", offset) + if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) + } + glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + } + return nil +} diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go new file mode 100644 index 000000000..d229bdf20 --- /dev/null +++ b/weed/storage/volume_stream_write.go @@ -0,0 +1,104 @@ +package storage + +import ( + "bufio" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + df, ok := v.DataBackend.(*backend.DiskFile) + if !ok { + return fmt.Errorf("unexpected volume backend") + } + offset, _, _ := v.DataBackend.GetStat() + + header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation + CookieToBytes(header[0:CookieSize], n.Cookie) + NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) + n.Size = 4 + Size(dataSize) + 1 + SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) + + n.DataSize = dataSize + + // needle header + df.Write(header[0:NeedleHeaderSize]) + + // data size and data + util.Uint32toBytes(header[0:4], n.DataSize) + df.Write(header[0:4]) + // write and calculate CRC + crcWriter := needle.NewCRCwriter(df) + io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) + + // flags + util.Uint8toBytes(header[0:1], n.Flags) + df.Write(header[0:1]) + + // data checksum + util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) + // write timestamp, padding + n.AppendAtNs = uint64(time.Now().UnixNano()) + util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) + padding := needle.PaddingLength(n.Size, needle.Version3) + df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) + + // add to needle map + if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) + } + return +} + +func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return ErrorNotFound + } + + sr := &StreamReader{ + readerAt: v.DataBackend, + offset: nv.Offset.ToActualOffset(), + } + bufReader := bufio.NewReader(sr) + bufReader.Discard(NeedleHeaderSize) + sizeBuf := make([]byte, 4) + bufReader.Read(sizeBuf) + if _, err = writer.Write(sizeBuf); err != nil { + return err + } + dataSize := util.BytesToUint32(sizeBuf) + + _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) + + return +} + +type StreamReader struct { + offset int64 + readerAt io.ReaderAt +} + +func (sr *StreamReader) Read(p []byte) (n int, err error) { + n, err = sr.readerAt.ReadAt(p, sr.offset) + if err != nil { + return + } + sr.offset += int64(n) + return +} diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go index 77efd8a14..23160906b 100644 --- a/weed/storage/volume_tier.go +++ b/weed/storage/volume_tier.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { @@ -14,13 +15,23 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { func (v *Volume) maybeLoadVolumeInfo() (found bool) { - v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif")) + var err error + v.volumeInfo, v.hasRemoteFile, found, err = pb.MaybeLoadVolumeInfo(v.FileName(".vif")) + + if v.volumeInfo.Version == 0 { + v.volumeInfo.Version = uint32(needle.CurrentVersion) + } if v.hasRemoteFile { glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id, v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key) } + if err != nil { + glog.Warningf("load volume %d.vif file: %v", v.Id, err) + return + } + return } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 0ee1e61c6..be84f8a13 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI if err != nil { return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } - dstDatBackend.Append(needleBytes) + dstDatBackend.Write(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_write.go index 07376bc88..a286c5dd5 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_write.go @@ -4,14 +4,12 @@ import ( "bytes" "errors" "fmt" - "io" "os" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -104,47 +102,8 @@ func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchan err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) return } - if v.isFileUnchanged(n) { - size = Size(n.DataSize) - isUnchanged = true - return - } - - // check whether existing needle cookie matches - nv, ok := v.nm.Get(n.Id) - if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) - if existingNeedleReadErr != nil { - err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) - return - } - if existingNeedle.Cookie != n.Cookie { - glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) - err = fmt.Errorf("mismatching cookie %x", n.Cookie) - return - } - } - - // append to dat file - n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, size, _, err = n.Append(v.DataBackend, v.Version()) - v.checkReadWriteError(err) - if err != nil { - return - } - v.lastAppendAtNs = n.AppendAtNs - - // add to needle map - if !ok || uint64(nv.Offset.ToActualOffset()) < offset { - if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) - } - } - if v.lastModifiedTsSeconds < n.LastModified { - v.lastModifiedTsSeconds = n.LastModified - } - return + return v.doWriteRequest(n) } func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) { @@ -185,7 +144,8 @@ func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isU return } if existingNeedle.Cookie != n.Cookie { - glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) + glog.V(0).Infof("write cookie mismatch: existing %s, new %s", + needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n)) err = fmt.Errorf("mismatching cookie %x", n.Cookie) return } @@ -223,24 +183,7 @@ func (v *Volume) syncDelete(n *needle.Needle) (Size, error) { return 0, err } - nv, ok := v.nm.Get(n.Id) - // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) - if ok && nv.Size.IsValid() { - size := nv.Size - n.Data = nil - n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, _, _, err := n.Append(v.DataBackend, v.Version()) - v.checkReadWriteError(err) - if err != nil { - return size, err - } - v.lastAppendAtNs = n.AppendAtNs - if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { - return size, err - } - return size, err - } - return 0, nil + return v.doDeleteRequest(n) } func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) { @@ -282,52 +225,6 @@ func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) { return 0, nil } -// read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) { - v.dataFileAccessLock.RLock() - defer v.dataFileAccessLock.RUnlock() - - nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset.IsZero() { - return -1, ErrorNotFound - } - readSize := nv.Size - if readSize.IsDeleted() { - if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { - glog.V(3).Infof("reading deleted %s", n.String()) - readSize = -readSize - } else { - return -1, ErrorDeleted - } - } - if readSize == 0 { - return 0, nil - } - err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) - if err == needle.ErrorSizeMismatch && OffsetSize == 4 { - err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) - } - v.checkReadWriteError(err) - if err != nil { - return 0, err - } - bytesRead := len(n.Data) - if !n.HasTtl() { - return bytesRead, nil - } - ttlMinutes := n.Ttl.Minutes() - if ttlMinutes == 0 { - return bytesRead, nil - } - if !n.HasLastModifiedDate() { - return bytesRead, nil - } - if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { - return bytesRead, nil - } - return -1, ErrorNotFound -} - func (v *Volume) startWorker() { go func() { chanClosed := false @@ -403,66 +300,28 @@ func (v *Volume) startWorker() { }() } -type VolumeFileScanner interface { - VisitSuperBlock(super_block.SuperBlock) error - ReadNeedleBody() bool - VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error -} - -func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, - needleMapKind NeedleMapKind, - volumeFileScanner VolumeFileScanner) (err error) { - var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { - return fmt.Errorf("failed to load volume %d: %v", id, err) - } - if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { - return fmt.Errorf("failed to process volume %d super block: %v", id, err) - } - defer v.Close() +func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error { - version := v.Version() + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() - offset := int64(v.SuperBlock.BlockSize()) + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) { + return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) + } - return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) -} + appendAtNs := uint64(time.Now().UnixNano()) + offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version()) -func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) - if e != nil { - if e == io.EOF { - return nil - } - return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) + v.checkReadWriteError(err) + if err != nil { + return err } - for n != nil { - var needleBody []byte - if volumeFileScanner.ReadNeedleBody() { - // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest) - if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { - glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err) - // err = fmt.Errorf("cannot read needle body: %v", err) - // return - } - } - err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) - if err == io.EOF { - return nil - } - if err != nil { - glog.V(0).Infof("visit needle error: %v", err) - return fmt.Errorf("visit needle error: %v", err) - } - offset += NeedleHeaderSize + rest - glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) - } - glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + v.lastAppendAtNs = appendAtNs + + // add to needle map + if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil { + glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err) } - return nil + + return err } |
