aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxushuxun <xushuxun@outlook.com>2019-08-15 11:20:01 +0800
committerGitHub <noreply@github.com>2019-08-15 11:20:01 +0800
commit3d01510b024d9e2058db7870bf158d4d9278b005 (patch)
tree95c19d4a13e27e681287b27c268c81df834d9f58
parente40634e6b49e8ab20720c7b8625b9333bf2bab37 (diff)
parentd829df4f5964e6c0397fcba30554f7b5e4875ea8 (diff)
downloadseaweedfs-3d01510b024d9e2058db7870bf158d4d9278b005.tar.xz
seaweedfs-3d01510b024d9e2058db7870bf158d4d9278b005.zip
Merge pull request #1 from chrislusf/master
volume: protect against nil needle map
-rw-r--r--weed/storage/needle_map_metric.go30
-rw-r--r--weed/storage/store.go10
-rw-r--r--weed/storage/volume.go56
-rw-r--r--weed/storage/volume_backup.go5
-rw-r--r--weed/storage/volume_read_write.go4
-rw-r--r--weed/storage/volume_vacuum.go4
6 files changed, 92 insertions, 17 deletions
diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go
index 6448b053b..823a04108 100644
--- a/weed/storage/needle_map_metric.go
+++ b/weed/storage/needle_map_metric.go
@@ -19,10 +19,16 @@ type mapMetric struct {
}
func (mm *mapMetric) logDelete(deletedByteCount uint32) {
+ if mm == nil {
+ return
+ }
mm.LogDeletionCounter(deletedByteCount)
}
func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
+ if mm == nil {
+ return
+ }
mm.MaybeSetMaxFileKey(key)
mm.LogFileCounter(newSize)
if oldSize > 0 && oldSize != TombstoneFileSize {
@@ -30,32 +36,56 @@ func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
}
}
func (mm *mapMetric) LogFileCounter(newSize uint32) {
+ if mm == nil {
+ return
+ }
atomic.AddUint32(&mm.FileCounter, 1)
atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
}
func (mm *mapMetric) LogDeletionCounter(oldSize uint32) {
+ if mm == nil {
+ return
+ }
if oldSize > 0 {
atomic.AddUint32(&mm.DeletionCounter, 1)
atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
}
}
func (mm *mapMetric) ContentSize() uint64 {
+ if mm == nil {
+ return 0
+ }
return atomic.LoadUint64(&mm.FileByteCounter)
}
func (mm *mapMetric) DeletedSize() uint64 {
+ if mm == nil {
+ return 0
+ }
return atomic.LoadUint64(&mm.DeletionByteCounter)
}
func (mm *mapMetric) FileCount() int {
+ if mm == nil {
+ return 0
+ }
return int(atomic.LoadUint32(&mm.FileCounter))
}
func (mm *mapMetric) DeletedCount() int {
+ if mm == nil {
+ return 0
+ }
return int(atomic.LoadUint32(&mm.DeletionCounter))
}
func (mm *mapMetric) MaxFileKey() NeedleId {
+ if mm == nil {
+ return 0
+ }
t := uint64(mm.MaximumFileKey)
return Uint64ToNeedleId(t)
}
func (mm *mapMetric) MaybeSetMaxFileKey(key NeedleId) {
+ if mm == nil {
+ return
+ }
if key > mm.MaxFileKey() {
atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index ac13f6a28..f0dc90790 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -137,9 +137,9 @@ func (s *Store) Status() []*VolumeInfo {
Collection: v.Collection,
ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
- FileCount: v.nm.FileCount(),
- DeleteCount: v.nm.DeletedCount(),
- DeletedByteCount: v.nm.DeletedSize(),
+ FileCount: int(v.FileCount()),
+ DeleteCount: int(v.DeletedCount()),
+ DeletedByteCount: v.DeletedSize(),
ReadOnly: v.readOnly,
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
@@ -168,8 +168,8 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
location.Lock()
for _, v := range location.volumes {
- if maxFileKey < v.nm.MaxFileKey() {
- maxFileKey = v.nm.MaxFileKey()
+ if maxFileKey < v.MaxFileKey() {
+ maxFileKey = v.MaxFileKey()
}
if !v.expired(s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index a5e923547..a2e34bd04 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"path"
@@ -85,14 +86,54 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)
return // -1 causes integer overflow and the volume to become unwritable.
}
-func (v *Volume) IndexFileSize() uint64 {
- return v.nm.IndexFileSize()
+func (v *Volume) ContentSize() uint64 {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return v.nm.ContentSize()
+}
+
+func (v *Volume) DeletedSize() uint64 {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return v.nm.DeletedSize()
}
func (v *Volume) FileCount() uint64 {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
return uint64(v.nm.FileCount())
}
+func (v *Volume) DeletedCount() uint64 {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return uint64(v.nm.DeletedCount())
+}
+
+func (v *Volume) MaxFileKey() types.NeedleId {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return v.nm.MaxFileKey()
+}
+
+func (v *Volume) IndexFileSize() uint64 {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return v.nm.IndexFileSize()
+}
+
+func (v *Volume) IndexFileContent() ([]byte, error) {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return v.nm.IndexFileContent()
+}
+
+func (v *Volume) IndexFileName() string {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ return v.nm.IndexFileName()
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@@ -112,10 +153,6 @@ func (v *Volume) NeedToReplicate() bool {
return v.ReplicaPlacement.GetCopyCount() > 1
}
-func (v *Volume) ContentSize() uint64 {
- return v.nm.ContentSize()
-}
-
// volume is expired if modified time + volume ttl < now
// except when volume is empty
// or when the volume does not have a ttl
@@ -158,13 +195,14 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
size, _, modTime := v.FileStat()
+
return &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
Size: size,
Collection: v.Collection,
- FileCount: uint64(v.nm.FileCount()),
- DeleteCount: uint64(v.nm.DeletedCount()),
- DeletedByteCount: v.nm.DeletedSize(),
+ FileCount: uint64(v.FileCount()),
+ DeleteCount: uint64(v.DeletedCount()),
+ DeletedByteCount: v.DeletedSize(),
ReadOnly: v.readOnly,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index f56c40019..e10990a9b 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -15,12 +15,15 @@ import (
)
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
if stat, err := v.dataFile.Stat(); err == nil {
syncStatus.TailOffset = uint64(stat.Size())
}
syncStatus.Collection = v.Collection
- syncStatus.IdxFileSize = v.nm.IndexFileSize()
+ syncStatus.IdxFileSize = v.IndexFileSize()
syncStatus.CompactRevision = uint32(v.SuperBlock.CompactionRevision)
syncStatus.Ttl = v.SuperBlock.Ttl.String()
syncStatus.Replication = v.SuperBlock.ReplicaPlacement.String()
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 2c67b2dc4..ae05331a4 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -21,6 +21,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
if v.Ttl.String() != "" {
return false
}
+
nv, ok := v.nm.Get(n.Id)
if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize {
oldNeedle := new(needle.Needle)
@@ -138,6 +139,9 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() {
return -1, ErrorNotFound
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index ff09df42d..c021c4c18 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -18,7 +18,7 @@ func (v *Volume) garbageLevel() float64 {
if v.ContentSize() == 0 {
return 0
}
- return float64(v.nm.DeletedSize()) / float64(v.ContentSize())
+ return float64(v.DeletedSize()) / float64(v.ContentSize())
}
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
@@ -33,7 +33,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
}()
filePath := v.FileName()
- v.lastCompactIndexOffset = v.nm.IndexFileSize()
+ v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)