aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-31 15:08:32 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-31 15:08:32 -0800
commit1478d7ea211f4b390d3906e8c773f26074900d1a (patch)
treea36d236b9a2d36a9066d6ec5a1ac152adb140de0
parent81904ad3364685d016a7fc1d0335702f1c1cb169 (diff)
downloadseaweedfs-1478d7ea211f4b390d3906e8c773f26074900d1a.tar.xz
seaweedfs-1478d7ea211f4b390d3906e8c773f26074900d1a.zip
reduce file seek when writing
-rw-r--r--weed/storage/needle_read_write.go30
-rw-r--r--weed/storage/store.go2
-rw-r--r--weed/storage/volume_read_write.go34
-rw-r--r--weed/storage/volume_vacuum.go6
-rw-r--r--weed/storage/volume_vacuum_test.go2
5 files changed, 27 insertions, 47 deletions
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go
index 34cf71dc2..6441cb501 100644
--- a/weed/storage/needle_read_write.go
+++ b/weed/storage/needle_read_write.go
@@ -3,7 +3,6 @@ package storage
import (
"errors"
"fmt"
- "io"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -28,20 +27,19 @@ func (n *Needle) DiskSize(version Version) int64 {
return getActualSize(n.Size, version)
}
-func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize int64, err error) {
- if s, ok := w.(io.Seeker); ok {
- if end, e := s.Seek(0, 1); e == nil {
- defer func(s io.Seeker, off int64) {
- if err != nil {
- if _, e = s.Seek(off, 0); e != nil {
- glog.V(0).Infof("Failed to seek %s back to %d with error: %v", w, off, e)
- }
+func (n *Needle) Append(w *os.File, version Version) (offset Offset, size uint32, actualSize int64, err error) {
+ if end, e := w.Seek(0, 2); e == nil {
+ defer func(w *os.File, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
}
- }(s, end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
- return
- }
+ }
+ }(w, end)
+ offset = Offset(end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
}
switch version {
case Version1:
@@ -159,9 +157,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, actualSize i
_, err = w.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
}
- return n.DataSize, getActualSize(n.Size, version), err
+ return offset, n.DataSize, getActualSize(n.Size, version), err
}
- return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
+ return 0, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
func ReadNeedleBlob(r *os.File, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 8c0299545..7ae5dcee6 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -200,7 +200,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
}
// TODO: count needle size ahead
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
- size, err = v.writeNeedle(n)
+ _, size, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index ae8dd1d40..e90e26144 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -74,7 +74,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
return
}
-func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
+func (v *Volume) writeNeedle(n *Needle) (offset Offset, size uint32, err error) {
glog.V(4).Infof("writing needle %s", NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
@@ -87,32 +87,15 @@ func (v *Volume) writeNeedle(n *Needle) (size uint32, err error) {
glog.V(4).Infof("needle is unchanged!")
return
}
- var offset int64
- if offset, err = v.dataFile.Seek(0, 2); err != nil {
- glog.V(0).Infof("failed to seek the end of file: %v", err)
- return
- }
-
- //ensure file writing starting from aligned positions
- if offset%NeedlePaddingSize != 0 {
- offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
- if offset, err = v.dataFile.Seek(offset, 0); err != nil {
- glog.V(0).Infof("failed to align in datafile %s: %v", v.dataFile.Name(), err)
- return
- }
- }
n.AppendAtNs = uint64(time.Now().UnixNano())
- if size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
- if e := v.dataFile.Truncate(offset); e != nil {
- err = fmt.Errorf("%s\ncannot truncate %s: %v", err, v.dataFile.Name(), e)
- }
+ if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
return
}
nv, ok := v.nm.Get(n.Id)
- if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
- if err = v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size); err != nil {
+ if !ok || Offset(nv.Offset)*NeedlePaddingSize < offset {
+ if err = v.nm.Put(n.Id, offset/NeedlePaddingSize, n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
}
}
@@ -133,16 +116,15 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) {
//fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
if ok && nv.Size != TombstoneFileSize {
size := nv.Size
- offset, err := v.dataFile.Seek(0, 2)
+ n.Data = nil
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ offset, _, _, err := n.Append(v.dataFile, v.Version())
if err != nil {
return size, err
}
- if err := v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)); err != nil {
+ if err = v.nm.Delete(n.Id, offset/NeedlePaddingSize); err != nil {
return size, err
}
- n.Data = nil
- n.AppendAtNs = uint64(time.Now().UnixNano())
- _, _, err = n.Append(v.dataFile, v.Version())
return size, err
}
return 0, nil
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index fda568126..5e0b19b66 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -215,7 +215,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
fakeDelNeedle.Id = key
fakeDelNeedle.Cookie = 0x12345678
fakeDelNeedle.AppendAtNs = uint64(time.Now().UnixNano())
- _, _, err = fakeDelNeedle.Append(dst, v.Version())
+ _, _, _, err = fakeDelNeedle.Append(dst, v.Version())
if err != nil {
return fmt.Errorf("append deleted %d failed: %v", key, err)
}
@@ -269,7 +269,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
- if _, _, err := n.Append(dst, v.Version()); err != nil {
+ if _, _, _, err := n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize(version)
@@ -329,7 +329,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
if err = nm.Put(n.Id, Offset(new_offset/NeedlePaddingSize), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
- if _, _, err = n.Append(dst, v.Version()); err != nil {
+ if _, _, _, err = n.Append(dst, v.Version()); err != nil {
return fmt.Errorf("cannot append needle: %s", err)
}
new_offset += n.DiskSize(v.Version())
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index a5c2b2025..0bc24037d 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -122,7 +122,7 @@ func TestCompaction(t *testing.T) {
}
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
n := newRandomNeedle(uint64(i))
- size, err := v.writeNeedle(n)
+ _, size, err := v.writeNeedle(n)
if err != nil {
t.Fatalf("write file %d: %v", i, err)
}