aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-12-03 22:54:08 -0800
committerChris Lu <chris.lu@gmail.com>2012-12-03 22:54:08 -0800
commit6201ed537ef11f91c24c14612f1d087796e2d5f8 (patch)
treea6914928abba0707c80e14953b94dc8c36683947
parentb3df7673edbe369b964ee749da2c6d1e110b2f35 (diff)
downloadseaweedfs-6201ed537ef11f91c24c14612f1d087796e2d5f8.tar.xz
seaweedfs-6201ed537ef11f91c24c14612f1d087796e2d5f8.zip
reporting volume size as early as possible
-rw-r--r--weed-fs/src/cmd/weed/master.go3
-rw-r--r--weed-fs/src/cmd/weed/volume.go3
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go18
-rw-r--r--weed-fs/src/pkg/storage/store.go68
-rw-r--r--weed-fs/src/pkg/storage/volume.go5
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go15
6 files changed, 74 insertions, 38 deletions
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
index 92d96718a..a70f8273e 100644
--- a/weed-fs/src/cmd/weed/master.go
+++ b/weed-fs/src/cmd/weed/master.go
@@ -114,6 +114,9 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
debug(s, "volumes", r.FormValue("volumes"))
topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount)
+ m := make(map[string]interface{})
+ m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB)*1024*1024
+ writeJson(w, r, m)
}
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index 88841273a..f12f9f4e0 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -317,8 +317,9 @@ func runVolume(cmd *Command, args []string) bool {
go func() {
connected := true
+ store.SetMaster(*masterNode)
for {
- err := store.Join(*masterNode)
+ err := store.Join()
if err == nil {
if !connected {
connected = true
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
index ac6998337..e01c27630 100644
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ b/weed-fs/src/pkg/storage/needle_map.go
@@ -15,7 +15,8 @@ type NeedleMap struct {
deletionCounter int
fileCounter int
- deletionByteCounter uint32
+ deletionByteCounter uint64
+ fileByteCounter uint64
}
func NewNeedleMap(file *os.File) *NeedleMap {
@@ -44,19 +45,20 @@ func LoadNeedleMap(file *os.File) *NeedleMap {
key := util.BytesToUint64(bytes[i : i+8])
offset := util.BytesToUint32(bytes[i+8 : i+12])
size := util.BytesToUint32(bytes[i+12 : i+16])
+ nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
if offset > 0 {
oldSize := nm.m.Set(Key(key), offset, size)
//log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- nm.fileCounter++
if oldSize > 0 {
nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + oldSize
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
}
} else {
nm.m.Delete(Key(key))
//log.Println("removing key", key)
nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + size
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
}
}
@@ -71,9 +73,10 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
util.Uint32toBytes(nm.bytes[8:12], offset)
util.Uint32toBytes(nm.bytes[12:16], size)
nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
if oldSize > 0 {
nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + oldSize
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
}
return nm.indexFile.Write(nm.bytes)
}
@@ -82,7 +85,7 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
return
}
func (nm *NeedleMap) Delete(key uint64) {
- nm.deletionByteCounter = nm.deletionByteCounter + nm.m.Delete(Key(key))
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], 0)
util.Uint32toBytes(nm.bytes[12:16], 0)
@@ -92,3 +95,6 @@ func (nm *NeedleMap) Delete(key uint64) {
func (nm *NeedleMap) Close() {
nm.indexFile.Close()
}
+func (nm *NeedleMap) ContentSize() uint64 {
+ return nm.fileByteCounter
+}
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index 03d4357e4..abd336d5d 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -18,6 +18,10 @@ type Store struct {
Ip string
PublicUrl string
MaxVolumeCount int
+
+ //read from the master
+ masterNode string
+ volumeSizeLimit uint64
}
func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
@@ -70,15 +74,15 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
}
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false
- }
- garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
- if e != nil {
- return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false
- }
- return nil, garbageThreshold < s.volumes[vid].garbageLevel()
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false
+ }
+ garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
+ if e != nil {
+ return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false
+ }
+ return nil, garbageThreshold < s.volumes[vid].garbageLevel()
}
func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
@@ -87,12 +91,12 @@ func (s *Store) CompactVolume(volumeIdString string) error {
}
return s.volumes[vid].compact()
}
-func (s *Store) CommitCompactVolume(volumeIdString string) (error) {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
- }
- return s.volumes[vid].commitCompact()
+func (s *Store) CommitCompactVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ return s.volumes[vid].commitCompact()
}
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
@@ -115,16 +119,24 @@ func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for k, v := range s.volumes {
s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
stats = append(stats, s)
}
return stats
}
-func (s *Store) Join(mserver string) error {
+
+type JoinResult struct {
+ VolumeSizeLimit uint64
+}
+
+func (s *Store) SetMaster(mserver string) {
+ s.masterNode = mserver
+}
+func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.Size(), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ s.Id, s.Size, s.RepType, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), uint64(v.Size()), v.replicaType, v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
*stats = append(*stats, s)
}
bytes, _ := json.Marshal(stats)
@@ -134,8 +146,16 @@ func (s *Store) Join(mserver string) error {
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
- _, err := util.Post("http://"+mserver+"/dir/join", values)
- return err
+ jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
+ if err != nil {
+ return err
+ }
+ var ret JoinResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ s.volumeSizeLimit = ret.VolumeSizeLimit
+ return nil
}
func (s *Store) Close() {
for _, v := range s.volumes {
@@ -144,9 +164,13 @@ func (s *Store) Close() {
}
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
if v := s.volumes[i]; v != nil {
- return v.write(n)
+ size := v.write(n)
+ if s.volumeSizeLimit < v.ContentSize()+uint64(size) {
+ s.Join()
+ }
+ return size
}
- log.Println("volume",i, "not found!")
+ log.Println("volume", i, "not found!")
return 0
}
func (s *Store) Delete(i VolumeId, n *Needle) uint32 {
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 285356e26..f7314d3ed 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -132,7 +132,7 @@ func (v *Volume) read(n *Needle) (int, error) {
}
func (v *Volume) garbageLevel() float64 {
- return float64(v.nm.deletionByteCounter)/float64(v.Size())
+ return float64(v.nm.deletionByteCounter)/float64(v.ContentSize())
}
func (v *Volume) compact() error {
@@ -212,3 +212,6 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
return nil
}
+func (v *Volume) ContentSize() uint64{
+ return v.nm.fileByteCounter
+}
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index dfedb0af3..060277e0a 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -1,13 +1,12 @@
package storage
-import (
-)
+import ()
type VolumeInfo struct {
- Id VolumeId
- Size int64
- RepType ReplicationType
- FileCount int
- DeleteCount int
- DeletedByteCount uint32
+ Id VolumeId
+ Size uint64
+ RepType ReplicationType
+ FileCount int
+ DeleteCount int
+ DeletedByteCount uint64
}