diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/erasure_coding/ec_shard.go | 2 | ||||
| -rw-r--r-- | weed/storage/store.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_read_write.go | 19 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 2 |
4 files changed, 22 insertions, 5 deletions
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index b280157b8..47e6d3d1e 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -81,7 +81,7 @@ func (shard *EcVolumeShard) Close() { func (shard *EcVolumeShard) Destroy() { os.Remove(shard.FileName() + ToExt(int(shard.ShardId))) - stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Inc() + stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec() } func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) { diff --git a/weed/storage/store.go b/weed/storage/store.go index d2dd76d52..e99d77774 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -211,7 +211,7 @@ func (s *Store) Close() { } } -func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { if v.readOnly { err = fmt.Errorf("volume %d is read only", i) @@ -230,7 +230,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUncha return } -func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) { +func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) { if v := s.findVolume(i); v != nil && !v.readOnly { return v.deleteNeedle(n) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 93ce1eab9..0327e5a1f 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -96,13 +96,29 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn n.Ttl = v.Ttl } + // check whether existing needle cookie matches + nv, ok := v.nm.Get(n.Id) + if ok { + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset()) + if existingNeedleReadErr != nil { + err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) + return + } + if existingNeedle.Cookie != n.Cookie { + glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) + err = fmt.Errorf("mismatching cookie %x", n.Cookie) + return + } + } + + // append to dat file n.AppendAtNs = uint64(time.Now().UnixNano()) if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil { return } v.lastAppendAtNs = n.AppendAtNs - nv, ok := v.nm.Get(n.Id) + // add to needle map if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) @@ -224,6 +240,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, } if err != nil { glog.V(0).Infof("visit needle error: %v", err) + return fmt.Errorf("visit needle error: %v", err) } offset += NeedleHeaderSize + rest glog.V(4).Infof("==> new entry offset %d", offset) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index cc7c7d6e6..3bb306649 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -53,7 +53,7 @@ func (v *Volume) CommitCompact() error { glog.V(0).Infof("fail to close volume %d", v.Id) } v.dataFile = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() var e error if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { |
