diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-01-21 21:18:01 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-01-21 21:18:01 -0800 |
| commit | c8b2dac6c1d29bab2f96c82ad30b7045b61ab8e6 (patch) | |
| tree | 5caddc4a2bfbb89fd1e674dc52a0db9a02388639 | |
| parent | bb1be616023ebfa4307b484ddcda3dc2720ce14c (diff) | |
| download | seaweedfs-c8b2dac6c1d29bab2f96c82ad30b7045b61ab8e6.tar.xz seaweedfs-c8b2dac6c1d29bab2f96c82ad30b7045b61ab8e6.zip | |
volume: avoid sharing volume dat file handle
possibly help on https://github.com/chrislusf/seaweedfs/issues/1184
| -rw-r--r-- | weed/storage/volume_vacuum.go | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 09bf36f7a..0ca9016c8 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -72,7 +72,7 @@ func (v *Volume) Compact2(preallocate int64) error { v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx", preallocate) + return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate) } func (v *Volume) CommitCompact() error { @@ -353,26 +353,31 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return } -func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) { +func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) { var ( - dstDatBackend backend.BackendStorageFile + srcDatBackend, dstDatBackend backend.BackendStorageFile + dataFile *os.File ) - if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil { + if dstDatBackend, err = createVolumeFile(dstDatName, preallocate, 0); err != nil { return } defer dstDatBackend.Close() oldNm := needle_map.NewMemDb() newNm := needle_map.NewMemDb() - if err = oldNm.LoadFromIdx(v.FileName()+".idx"); err != nil { + if err = oldNm.LoadFromIdx(srcIdxName); err != nil { return } + if dataFile, err = os.Open(srcDatName); err != nil { + return err + } + srcDatBackend = backend.NewDiskFile(dataFile) now := uint64(time.Now().Unix()) - v.SuperBlock.CompactionRevision++ - dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0) - newOffset := int64(v.SuperBlock.BlockSize()) + sb.CompactionRevision++ + dstDatBackend.WriteAt(sb.Bytes(), 0) + newOffset := int64(sb.BlockSize()) oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { @@ -383,28 +388,28 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate i } n := new(needle.Needle) - err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version()) + err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version) if err != nil { return nil } - if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) { return nil } if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } - if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { + if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil { return fmt.Errorf("cannot append needle: %s", err) } - newOffset += n.DiskSize(v.Version()) + newOffset += n.DiskSize(version) glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) return nil }) - newNm.SaveToIdx(idxName) + newNm.SaveToIdx(datIdxName) return } |
