aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-11 21:39:16 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-11 21:39:16 -0700
commite4af63a721f62c280596f626302b64729baeaee9 (patch)
tree06c863d0362c23d53efdc053ab7750bf88e7feb6 /weed/storage
parent5987810e5eb56611389451029fb85ff1fd700996 (diff)
downloadseaweedfs-e4af63a721f62c280596f626302b64729baeaee9.tar.xz
seaweedfs-e4af63a721f62c280596f626302b64729baeaee9.zip
volume server: accept fsync=true in write requests
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/store.go4
-rw-r--r--weed/storage/volume_read_write.go7
-rw-r--r--weed/storage/volume_vacuum_test.go2
3 files changed, 9 insertions, 4 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index d6b623e63..64e437add 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -252,7 +252,7 @@ func (s *Store) Close() {
}
}
-func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchanged bool, err error) {
+func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
if v.IsReadOnly() {
err = fmt.Errorf("volume %d is read only", i)
@@ -260,7 +260,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchan
}
// using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n)
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) {
- _, _, isUnchanged, err = v.writeNeedle(n)
+ _, _, isUnchanged, err = v.writeNeedle(n, fsync)
} else {
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index ac6154cef..bb0421724 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -63,7 +63,7 @@ func (v *Volume) Destroy() (err error) {
return
}
-func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
+func (v *Volume) writeNeedle(n *needle.Needle, fsync bool) (offset uint64, size uint32, isUnchanged bool, err error) {
// glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@@ -98,6 +98,11 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
return
}
+ if fsync {
+ if err = v.DataBackend.Sync(); err != nil {
+ return
+ }
+ }
v.lastAppendAtNs = n.AppendAtNs
// add to needle map
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 51f04c8b1..5d0fcbe31 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) {
}
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
n := newRandomNeedle(uint64(i))
- _, size, _, err := v.writeNeedle(n)
+ _, size, _, err := v.writeNeedle(n, false)
if err != nil {
t.Fatalf("write file %d: %v", i, err)
}