aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTamás Gulácsi <tgulacsi78+waterhouse@gmail.com>2013-01-13 17:07:38 +0100
committerTamás Gulácsi <tgulacsi78+waterhouse@gmail.com>2013-01-13 17:07:38 +0100
commitdd685fdd8d8ac6d28dce0d25b72115e3315a30a8 (patch)
treea6036a2cad8e389bc47b7d769fb47dae8824b81a
parentbf0ccf346198a65e0321b3cedfb25ef5dad73e2a (diff)
downloadseaweedfs-dd685fdd8d8ac6d28dce0d25b72115e3315a30a8.tar.xz
seaweedfs-dd685fdd8d8ac6d28dce0d25b72115e3315a30a8.zip
add Frozen attribute to VolumeInfo
-rw-r--r--weed-fs/src/cmd/weed/master.go10
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go16
-rw-r--r--weed-fs/src/pkg/storage/store.go17
-rw-r--r--weed-fs/src/pkg/storage/volume.go13
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go1
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go5
6 files changed, 55 insertions, 7 deletions
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
index c60974a67..151ae31fc 100644
--- a/weed-fs/src/cmd/weed/master.go
+++ b/weed-fs/src/cmd/weed/master.go
@@ -107,8 +107,14 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
if ip == "" {
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
}
- port, _ := strconv.Atoi(r.FormValue("port"))
- maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ port, err := strconv.Atoi(r.FormValue("port"))
+ if err != nil {
+ log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err)
+ }
+ maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ if err != nil {
+ log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err)
+ }
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
index 9d7369509..f103d10d8 100644
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ b/weed-fs/src/pkg/storage/needle_map.go
@@ -1,6 +1,7 @@
package storage
import (
+ "errors"
"io"
"log"
"os"
@@ -60,10 +61,16 @@ func NewFrozenNeedleMap(file *os.File) (*NeedleMap, error) {
}, nil
}
+func (nm NeedleMap) IsFrozen() bool {
+ return nm.m == nil && nm.fm != nil
+}
+
const (
RowsToRead = 1024
)
+var MapIsFrozen = errors.New("Map is frozen!")
+
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
@@ -124,6 +131,9 @@ func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
+ if nm.IsFrozen() {
+ return 0, MapIsFrozen
+ }
oldSize := nm.m.Set(Key(key), offset, size)
util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], offset)
@@ -140,13 +150,17 @@ func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
element, ok = nm.m.Get(Key(key))
return
}
-func (nm *NeedleMap) Delete(key uint64) {
+func (nm *NeedleMap) Delete(key uint64) error {
+ if nm.IsFrozen() {
+ return MapIsFrozen
+ }
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], 0)
util.Uint32toBytes(nm.bytes[12:16], 0)
nm.indexFile.Write(nm.bytes)
nm.deletionCounter++
+ return nil
}
func (nm *NeedleMap) Close() {
nm.indexFile.Close()
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index 79cf65e28..b06c29902 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -120,8 +120,16 @@ func (s *Store) loadExistingVolumes() {
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for k, v := range s.volumes {
- s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ s := &VolumeInfo{
+ Id: VolumeId(k),
+ Size: v.ContentSize(),
+ RepType: v.replicaType,
+ Version: v.Version(),
+ FileCount: v.nm.fileCounter,
+ DeleteCount: v.nm.deletionCounter,
+ DeletedByteCount: v.nm.deletionByteCounter,
+ Frozen: !v.IsWritable(),
+ }
stats = append(stats, s)
}
return stats
@@ -134,6 +142,8 @@ type JoinResult struct {
func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver
}
+
+// call master's /dir/join
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
@@ -171,7 +181,8 @@ func (s *Store) Close() {
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
if v := s.volumes[i]; v != nil {
size := v.write(n)
- if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
+ if s.volumeSizeLimit < v.ContentSize()+uint64(size) &&
+ s.volumeSizeLimit >= v.ContentSize() {
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
s.Join()
}
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 71dfb5aee..6a79d6c40 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -3,6 +3,7 @@ package storage
import (
"errors"
"fmt"
+ "log"
"os"
"path"
"sync"
@@ -64,6 +65,18 @@ func (v *Volume) Size() int64 {
fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
return -1
}
+
+// a volume is writable, if its data file is writable and the index is not frozen
+func (v *Volume) IsWritable() bool {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error())
+ return false
+ }
+ // 4 for r, 2 for w, 1 for x
+ return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen()
+}
+
func (v *Volume) Close() {
v.accessLock.Lock()
defer v.accessLock.Unlock()
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index e4c5f6ec4..845301670 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -10,4 +10,5 @@ type VolumeInfo struct {
FileCount int
DeleteCount int
DeletedByteCount uint64
+ Frozen bool
}
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
index 23802ca81..141a40072 100644
--- a/weed-fs/src/pkg/topology/volume_layout.go
+++ b/weed-fs/src/pkg/topology/volume_layout.go
@@ -45,7 +45,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
- return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
+ return !v.Frozen &&
+ uint64(v.Size) < vl.volumeSizeLimit &&
+ v.Version == storage.CurrentVersion
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
@@ -92,6 +94,7 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return false
}
}
+ // FIXME: how to refuse if volume is unwritable/frozen?
fmt.Println("Volume", vid, "becomes writable")
vl.writables = append(vl.writables, vid)
if len(vl.writables) > 1 {