aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-06 01:35:48 -0700
committerchrislu <chris.lu@gmail.com>2025-06-06 01:35:48 -0700
commit7439af0eca7cb0f3914dce970fa542a08e35269a (patch)
tree0e52dcc50be020ae545768df5569db38cc52e454
parentcc135c63f7638b12a9e6a4b460fc858b412cf019 (diff)
downloadseaweedfs-7439af0eca7cb0f3914dce970fa542a08e35269a.tar.xz
seaweedfs-7439af0eca7cb0f3914dce970fa542a08e35269a.zip
refactoring
-rw-r--r--weed/storage/needle/needle_write.go51
-rw-r--r--weed/storage/needle/needle_write_v1.go25
-rw-r--r--weed/storage/needle/needle_write_v2.go25
-rw-r--r--weed/storage/needle/needle_write_v3.go25
4 files changed, 54 insertions, 72 deletions
diff --git a/weed/storage/needle/needle_write.go b/weed/storage/needle/needle_write.go
index 2364d2589..5fe9f6a79 100644
--- a/weed/storage/needle/needle_write.go
+++ b/weed/storage/needle/needle_write.go
@@ -1,23 +1,44 @@
package needle
import (
+ "bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
)
func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
+ end, _, e := w.GetStat()
+ if e != nil {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
+ return
+ }
+ offset = uint64(end)
+ if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
+ err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
+ return
+ }
+ bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+ defer func() {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ // handle error or log
+ }
+ }
+ buffer_pool.SyncPoolPutBuffer(bytesBuffer)
+ }()
switch version {
case Version1:
- return writeNeedleV1(w, n)
+ return writeNeedleV1(w, n, offset, bytesBuffer)
case Version2:
- return writeNeedleV2(w, n)
+ return writeNeedleV2(w, n, offset, bytesBuffer)
case Version3:
- return writeNeedleV3(w, n)
+ return writeNeedleV3(w, n, offset, bytesBuffer)
default:
err = fmt.Errorf("unsupported version: %d", version)
return
@@ -52,3 +73,27 @@ func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size,
return
}
+
+// prepareNeedleWrite encapsulates the common beginning logic for all versioned writeNeedle functions.
+func prepareNeedleWrite(w backend.BackendStorageFile, n *Needle) (offset uint64, bytesBuffer *bytes.Buffer, cleanup func(err error), err error) {
+ end, _, e := w.GetStat()
+ if e != nil {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
+ return
+ }
+ offset = uint64(end)
+ if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
+ err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
+ return
+ }
+ bytesBuffer = buffer_pool.SyncPoolGetBuffer()
+ cleanup = func(err error) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ // handle error or log
+ }
+ }
+ buffer_pool.SyncPoolPutBuffer(bytesBuffer)
+ }
+ return
+}
diff --git a/weed/storage/needle/needle_write_v1.go b/weed/storage/needle/needle_write_v1.go
index a29c925f8..db912ee0f 100644
--- a/weed/storage/needle/needle_write_v1.go
+++ b/weed/storage/needle/needle_write_v1.go
@@ -1,36 +1,15 @@
package needle
import (
+ "bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
)
-func writeNeedleV1(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) {
- if end, _, e := w.GetStat(); e == nil {
- defer func(w backend.BackendStorageFile, off int64) {
- if err != nil {
- if te := w.Truncate(end); te != nil {
- // handle error
- }
- }
- }(w, end)
- offset = uint64(end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
- return
- }
- if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
- err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
- return
- }
-
- bytesBuffer := buffer_pool.SyncPoolGetBuffer()
- defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
-
+func writeNeedleV1(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) {
bytesBuffer.Reset()
header := make([]byte, NeedleHeaderSize)
CookieToBytes(header[0:CookieSize], n.Cookie)
diff --git a/weed/storage/needle/needle_write_v2.go b/weed/storage/needle/needle_write_v2.go
index fac50cb4f..d332bc1c2 100644
--- a/weed/storage/needle/needle_write_v2.go
+++ b/weed/storage/needle/needle_write_v2.go
@@ -1,37 +1,16 @@
package needle
import (
+ "bytes"
"fmt"
"math"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
)
-func writeNeedleV2(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) {
- if end, _, e := w.GetStat(); e == nil {
- defer func(w backend.BackendStorageFile, off int64) {
- if err != nil {
- if te := w.Truncate(end); te != nil {
- // handle error
- }
- }
- }(w, end)
- offset = uint64(end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
- return
- }
- if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
- err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
- return
- }
-
- bytesBuffer := buffer_pool.SyncPoolGetBuffer()
- defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
-
+func writeNeedleV2(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) {
bytesBuffer.Reset()
header := make([]byte, NeedleHeaderSize+TimestampSize)
CookieToBytes(header[0:CookieSize], n.Cookie)
diff --git a/weed/storage/needle/needle_write_v3.go b/weed/storage/needle/needle_write_v3.go
index 5e3ecfb4a..6927a1431 100644
--- a/weed/storage/needle/needle_write_v3.go
+++ b/weed/storage/needle/needle_write_v3.go
@@ -1,37 +1,16 @@
package needle
import (
+ "bytes"
"fmt"
"math"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
)
-func writeNeedleV3(w backend.BackendStorageFile, n *Needle) (offset uint64, size Size, actualSize int64, err error) {
- if end, _, e := w.GetStat(); e == nil {
- defer func(w backend.BackendStorageFile, off int64) {
- if err != nil {
- if te := w.Truncate(end); te != nil {
- // handle error
- }
- }
- }(w, end)
- offset = uint64(end)
- } else {
- err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
- return
- }
- if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
- err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
- return
- }
-
- bytesBuffer := buffer_pool.SyncPoolGetBuffer()
- defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
-
+func writeNeedleV3(w backend.BackendStorageFile, n *Needle, offset uint64, bytesBuffer *bytes.Buffer) (offsetOut uint64, size Size, actualSize int64, err error) {
bytesBuffer.Reset()
header := make([]byte, NeedleHeaderSize+TimestampSize)
CookieToBytes(header[0:CookieSize], n.Cookie)