aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-14 23:00:37 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-14 23:00:37 -0700
commit3e8a3a8fec6df5b39b6b2b4603df8bc183d90aa8 (patch)
tree8ce00d95e2f923b6b7d82fe15ba9793502815e8d
parent26aaccca088e099ea87d2ba60cfbdbc2e7f3d77f (diff)
downloadseaweedfs-3e8a3a8fec6df5b39b6b2b4603df8bc183d90aa8.tar.xz
seaweedfs-3e8a3a8fec6df5b39b6b2b4603df8bc183d90aa8.zip
fix race detector found problems
-rw-r--r--weed/server/volume_grpc_client_to_master.go5
-rw-r--r--weed/storage/needle_map_memory.go6
-rw-r--r--weed/storage/needle_map_metric.go57
-rw-r--r--weed/storage/store.go18
4 files changed, 53 insertions, 33 deletions
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 94e99c8f6..cf01b5bd8 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,10 +2,11 @@ package weed_server
import (
"fmt"
+ "time"
+
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
"google.golang.org/grpc"
- "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -75,7 +76,7 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
return
}
if in.GetVolumeSizeLimit() != 0 {
- vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
+ vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
}
if in.GetLeader() != "" && masterNode != in.GetLeader() {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index ad3bd3f7a..a3b574324 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -47,9 +47,7 @@ func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
- if key > nm.MaximumFileKey {
- nm.MaximumFileKey = key
- }
+ nm.MaybeSetMaxFileKey(key)
if !offset.IsZero() && size != TombstoneFileSize {
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
@@ -67,7 +65,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
}
return nil
})
- glog.V(1).Infof("max file key: %d for file: %s", nm.MaximumFileKey, file.Name())
+ glog.V(1).Infof("max file key: %d for file: %s", nm.MaxFileKey(), file.Name())
return nm, e
}
diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go
index cc3d9e028..0e2e16964 100644
--- a/weed/storage/needle_map_metric.go
+++ b/weed/storage/needle_map_metric.go
@@ -2,52 +2,65 @@ package storage
import (
"fmt"
+ "os"
+ "sync/atomic"
+
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/willf/bloom"
- "os"
)
type mapMetric struct {
- DeletionCounter int `json:"DeletionCounter"`
- FileCounter int `json:"FileCounter"`
- DeletionByteCounter uint64 `json:"DeletionByteCounter"`
- FileByteCounter uint64 `json:"FileByteCounter"`
- MaximumFileKey NeedleId `json:"MaxFileKey"`
+ DeletionCounter uint32 `json:"DeletionCounter"`
+ FileCounter uint32 `json:"FileCounter"`
+ DeletionByteCounter uint64 `json:"DeletionByteCounter"`
+ FileByteCounter uint64 `json:"FileByteCounter"`
+ MaximumFileKey uint64 `json:"MaxFileKey"`
}
func (mm *mapMetric) logDelete(deletedByteCount uint32) {
- mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(deletedByteCount)
- mm.DeletionCounter++
+ mm.LogDeletionCounter(deletedByteCount)
}
func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
- if key > mm.MaximumFileKey {
- mm.MaximumFileKey = key
+ mm.MaybeSetMaxFileKey(key)
+ mm.LogFileCounter(newSize)
+ if oldSize > 0 && oldSize != TombstoneFileSize {
+ mm.LogDeletionCounter(oldSize)
}
- mm.FileCounter++
- mm.FileByteCounter = mm.FileByteCounter + uint64(newSize)
+}
+func (mm mapMetric) LogFileCounter(newSize uint32) {
+ atomic.AddUint32(&mm.FileCounter, 1)
+ atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
+}
+func (mm mapMetric) LogDeletionCounter(oldSize uint32) {
if oldSize > 0 {
- mm.DeletionCounter++
- mm.DeletionByteCounter = mm.DeletionByteCounter + uint64(oldSize)
+ atomic.AddUint32(&mm.DeletionCounter, 1)
+ atomic.AddUint64(&mm.DeletionByteCounter, uint64(oldSize))
}
}
-
func (mm mapMetric) ContentSize() uint64 {
- return mm.FileByteCounter
+ return atomic.LoadUint64(&mm.FileByteCounter)
}
func (mm mapMetric) DeletedSize() uint64 {
- return mm.DeletionByteCounter
+ return atomic.LoadUint64(&mm.DeletionByteCounter)
}
func (mm mapMetric) FileCount() int {
- return mm.FileCounter
+ return int(atomic.LoadUint32(&mm.FileCounter))
}
func (mm mapMetric) DeletedCount() int {
- return mm.DeletionCounter
+ return int(atomic.LoadUint32(&mm.DeletionCounter))
}
func (mm mapMetric) MaxFileKey() NeedleId {
- return mm.MaximumFileKey
+ t := uint64(mm.MaximumFileKey)
+ return NeedleId(t)
+}
+func (mm mapMetric) MaybeSetMaxFileKey(key NeedleId) {
+ if key > mm.MaxFileKey() {
+ atomic.StoreUint64(&mm.MaximumFileKey, uint64(key))
+ }
}
+
func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
mm = &mapMetric{}
var bf *bloom.BloomFilter
@@ -56,9 +69,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
}, func(key NeedleId, offset Offset, size uint32) error {
- if key > mm.MaximumFileKey {
- mm.MaximumFileKey = key
- }
+ mm.MaybeSetMaxFileKey(key)
NeedleIdToBytes(buf, key)
if size != TombstoneFileSize {
mm.FileByteCounter += uint64(size)
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 56e973738..d866d2e11 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -2,6 +2,8 @@ package storage
import (
"fmt"
+ "sync/atomic"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -22,7 +24,7 @@ type Store struct {
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
- VolumeSizeLimit uint64 //read from the master
+ volumeSizeLimit uint64 //read from the master
Client master_pb.Seaweed_SendHeartbeatClient
NeedleMapType NeedleMapType
NewVolumeIdChan chan VolumeId
@@ -30,7 +32,7 @@ type Store struct {
}
func (s *Store) String() (str string) {
- str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
+ str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
return
}
@@ -150,7 +152,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
- if !v.expired(s.VolumeSizeLimit) {
+ if !v.expired(s.GetVolumeSizeLimit()) {
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
@@ -192,7 +194,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
_, size, err = v.writeNeedle(n)
} else {
- err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
+ err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
return
}
@@ -255,3 +257,11 @@ func (s *Store) DeleteVolume(i VolumeId) error {
return fmt.Errorf("Volume %d not found on disk", i)
}
+
+func (s *Store) SetVolumeSizeLimit(x uint64) {
+ atomic.StoreUint64(&s.volumeSizeLimit, x)
+}
+
+func (s *Store) GetVolumeSizeLimit() uint64 {
+ return atomic.LoadUint64(&s.volumeSizeLimit)
+}