aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordsd <60881537+dsd2077@users.noreply.github.com>2024-10-18 16:20:50 +0800
committerGitHub <noreply@github.com>2024-10-18 01:20:50 -0700
commit1e13b6879cb476b07255f59dbfa97a3e30fad698 (patch)
tree0c02db026c80f068c05d1631eea1979b34fee38b
parentf11567816e4412c39839e5d4d8caa793a2a14bf4 (diff)
downloadseaweedfs-1e13b6879cb476b07255f59dbfa97a3e30fad698.tar.xz
seaweedfs-1e13b6879cb476b07255f59dbfa97a3e30fad698.zip
fix(volume): to avoid duplicate write a same needle (#6138)
fix WriteNeedleBlob to avoid duplicate write a same needle Co-authored-by: 邓书东 <shudong_deng@hhnb2024010108.intsig.com> Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
-rw-r--r--weed/storage/volume_write.go16
1 files changed, 15 insertions, 1 deletions
diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go
index 8b731e028..434e296d4 100644
--- a/weed/storage/volume_write.go
+++ b/weed/storage/volume_write.go
@@ -4,11 +4,12 @@ import (
"bytes"
"errors"
"fmt"
+ "os"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "os"
)
var ErrorNotFound = errors.New("not found")
@@ -323,6 +324,19 @@ func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size
return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
}
+ nv, ok := v.nm.Get(needleId)
+ if ok && nv.Size == size {
+ oldNeedle := new(needle.Needle)
+ err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
+ if err == nil {
+ newNeedle := new(needle.Needle)
+ err = newNeedle.ReadBytes(needleBlob, nv.Offset.ToActualOffset(), size, v.Version())
+ if err == nil && oldNeedle.Cookie == newNeedle.Cookie && oldNeedle.Checksum == newNeedle.Checksum && bytes.Equal(oldNeedle.Data, newNeedle.Data) {
+ glog.V(0).Infof("needle %v already exists", needleId)
+ return nil
+ }
+ }
+ }
appendAtNs := needle.GetAppendAtNs(v.lastAppendAtNs)
offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())