aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/pkg
diff options
context:
space:
mode:
authorchris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0>2011-12-18 07:22:04 +0000
committerchris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0>2011-12-18 07:22:04 +0000
commit2c7a4eea1fd8874a2dfd4a28d6fa29e507fb0fe8 (patch)
treef7f86a8facff5f27aab6f5754a533794087bf080 /weed-fs/src/pkg
parent1fed603df9c90dcfe64f8639949b07654ca3f718 (diff)
downloadseaweedfs-2c7a4eea1fd8874a2dfd4a28d6fa29e507fb0fe8.tar.xz
seaweedfs-2c7a4eea1fd8874a2dfd4a28d6fa29e507fb0fe8.zip
simplified to one machine per volume
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@14 282b0af5-e82d-9cf1-ede4-77906d7719d0
Diffstat (limited to 'weed-fs/src/pkg')
-rw-r--r--weed-fs/src/pkg/directory/volume_mapping.go131
-rw-r--r--weed-fs/src/pkg/storage/store.go14
-rw-r--r--weed-fs/src/pkg/storage/volume.go6
3 files changed, 90 insertions, 61 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go
index e9a0647e5..4062b3109 100644
--- a/weed-fs/src/pkg/directory/volume_mapping.go
+++ b/weed-fs/src/pkg/directory/volume_mapping.go
@@ -7,24 +7,36 @@ import (
"rand"
"log"
"storage"
+ "sync"
+)
+
+const (
+ ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8
)
type Machine struct {
Server string //<server name/ip>[:port]
PublicServer string
- CanWrite bool
+ Volumes []storage.VolumeInfo
+ Capacity int
}
+
type Mapper struct {
dir string
fileName string
capacity int
- Machines [][]Machine //initial version only support one copy per machine
- writers [][]Machine // transient value to lookup writers fast
+
+ lock sync.Mutex
+ Machines []*Machine
+ vid2machineId map[uint64]int
+ writers []int // transient array of writers volume id
+
+ GlobalVolumeSequence uint64
}
-func NewMachine(server, publicServer string) (m *Machine) {
+func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) (m *Machine) {
m = new(Machine)
- m.Server, m.PublicServer = server, publicServer
+ m.Server, m.PublicServer, m.Volumes, m.Capacity = server, publicServer, volumes, capacity
return
}
@@ -33,74 +45,90 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) {
m.dir, m.fileName, m.capacity = dirname, filename, capacity
log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map"))
dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644)
- m.Machines = *new([][]Machine)
- m.writers = *new([][]Machine)
+ m.vid2machineId = make(map[uint64]int)
+ m.writers = *new([]int)
if e != nil {
log.Println("Mapping File Read", e)
+ m.Machines = *new([]*Machine)
} else {
decoder := gob.NewDecoder(dataFile)
- defer dataFile.Close()
+ defer dataFile.Close()
decoder.Decode(&m.Machines)
- for _, list := range m.Machines {
- //TODO: what if a list has mixed readers and writers? Now it's treated as readonly
- allCanWrite := false
- for _, entry := range list {
- allCanWrite = allCanWrite && entry.CanWrite
- }
- if allCanWrite {
- m.writers = append(m.writers, list)
+ decoder.Decode(&m.GlobalVolumeSequence)
+
+ //add to vid2machineId map, and writers array
+ for machine_index, machine := range m.Machines {
+ for _, v := range machine.Volumes {
+ m.vid2machineId[v.Id] = machine_index
+ if v.Size < ChunkSizeLimit {
+ m.writers = append(m.writers, machine_index)
+ }
}
}
- log.Println("Loaded mapping size", len(m.Machines))
+ log.Println("Loaded mapping size", len(m.Machines))
}
return
}
-func (m *Mapper) PickForWrite() []Machine {
- vid := rand.Intn(len(m.Machines))
- return m.Machines[vid]
+func (m *Mapper) PickForWrite() *Machine {
+ vid := rand.Intn(len(m.writers))
+ return m.Machines[m.writers[vid]]
}
-func (m *Mapper) Get(vid int) []Machine {
+func (m *Mapper) Get(vid int) *Machine {
return m.Machines[vid]
}
-func (m *Mapper) Add(machine Machine, volumes []storage.VolumeStat, capacity int) []int {
- log.Println("Adding existing", machine.Server, len(volumes), "volumes to dir", len(m.Machines))
- log.Println("Adding new ", machine.Server, capacity - len(volumes), "volumes to dir", len(m.Machines))
- maxId := len(m.Machines)-1
- for _, v := range volumes {
- if maxId < int(v.Id) {
- maxId = int(v.Id)
+func (m *Mapper) Add(machine Machine) []uint64 {
+ log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines))
+ log.Println("Adding new ", machine.Server, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines))
+ //check existing machine, linearly
+ m.lock.Lock()
+ foundExistingMachineId := -1
+ for index, entry := range m.Machines {
+ if machine.Server == entry.Server {
+ foundExistingMachineId = index
+ break
}
}
- for i := len(m.Machines); i <= maxId; i++ {
- m.Machines = append(m.Machines, nil)
- }
- log.Println("Machine list now is", len(m.Machines))
- for _, v := range volumes {
- found := false
- existing := m.Machines[v.Id]
- for _, entry := range existing {
- if machine.Server == entry.Server {
- found = true
- break
- }
- }
- if !found {
- m.Machines[v.Id] = append(existing, machine)
- log.Println("Setting volume", v.Id, "to", machine.Server)
- }
+ machineId := foundExistingMachineId
+ if machineId < 0 {
+ machineId = len(m.Machines)
+ m.Machines = append(m.Machines, &machine)
}
- vids := new([]int)
- for vid,i := len(m.Machines),len(volumes); i < capacity; i,vid=i+1,vid+1 {
- list := new([]Machine)
- *list = append(*list, machine)
- m.Machines = append(m.Machines, *list)
+ //generate new volumes
+ vids := new([]uint64)
+ for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 {
+ newVolume := *new(storage.VolumeInfo)
+ newVolume.Id, newVolume.Size = vid, 0
+ machine.Volumes = append(machine.Volumes, newVolume)
+ m.vid2machineId[vid] = machineId
log.Println("Adding volume", vid, "from", machine.Server)
*vids = append(*vids, vid)
+ m.GlobalVolumeSequence = vid + 1
}
m.Save()
- log.Println("Dir size =>", len(m.Machines))
+ m.lock.Unlock()
+
+ //add to vid2machineId map, and writers array
+ for _, v := range machine.Volumes {
+ log.Println("Setting volume", v.Id, "to", machine.Server)
+ m.vid2machineId[v.Id] = machineId
+ if v.Size < ChunkSizeLimit {
+ m.writers = append(m.writers, machineId)
+ }
+ }
+ //setting writers, copy-on-write because of possible updating
+ var writers []int
+ for machine_index, machine_entry := range m.Machines {
+ for _, v := range machine_entry.Volumes {
+ if v.Size < ChunkSizeLimit {
+ writers = append(writers, machine_index)
+ }
+ }
+ }
+ m.writers = writers
+
+ log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.writers))
return *vids
}
func (m *Mapper) Save() {
@@ -112,4 +140,5 @@ func (m *Mapper) Save() {
defer dataFile.Close()
encoder := gob.NewEncoder(dataFile)
encoder.Encode(m.Machines)
+ encoder.Encode(m.GlobalVolumeSequence)
}
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index e13f47257..ff6944853 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -16,9 +16,9 @@ type Store struct {
Port int
PublicServer string
}
-type VolumeStat struct {
- Id uint64 "id"
- CanWrite bool
+type VolumeInfo struct {
+ Id uint64
+ Size int64
}
func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) {
@@ -44,10 +44,10 @@ func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (
}
func (s *Store) Join(mserver string) {
- stats := new([]*VolumeStat)
- for k, _ := range s.volumes {
- s := new(VolumeStat)
- s.Id, s.CanWrite = k, true
+ stats := new([]*VolumeInfo)
+ for k, v := range s.volumes {
+ s := new(VolumeInfo)
+ s.Id, s.Size = k, v.Size()
*stats = append(*stats, s)
}
bytes, _ := json.Marshal(stats)
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 3acd2d0e3..4b7f6ab3a 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -38,12 +38,12 @@ func NewVolume(dirname string, id uint64) (v *Volume) {
return
}
-func (v *Volume) CanWrite(limit int64) bool {
+func (v *Volume) Size() int64 {
stat, e:=v.dataFile.Stat()
if e!=nil{
- return stat.Size < limit
+ return stat.Size
}
- return false
+ return -1
}
func (v *Volume) Close() {
close(v.accessChannel)