diff options
| author | chris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0> | 2011-12-18 07:22:04 +0000 |
|---|---|---|
| committer | chris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0> | 2011-12-18 07:22:04 +0000 |
| commit | 2c7a4eea1fd8874a2dfd4a28d6fa29e507fb0fe8 (patch) | |
| tree | f7f86a8facff5f27aab6f5754a533794087bf080 /weed-fs/src/pkg | |
| parent | 1fed603df9c90dcfe64f8639949b07654ca3f718 (diff) | |
| download | seaweedfs-2c7a4eea1fd8874a2dfd4a28d6fa29e507fb0fe8.tar.xz seaweedfs-2c7a4eea1fd8874a2dfd4a28d6fa29e507fb0fe8.zip | |
simplified to one machine per volume
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@14 282b0af5-e82d-9cf1-ede4-77906d7719d0
Diffstat (limited to 'weed-fs/src/pkg')
| -rw-r--r-- | weed-fs/src/pkg/directory/volume_mapping.go | 131 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/store.go | 14 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/volume.go | 6 |
3 files changed, 90 insertions, 61 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index e9a0647e5..4062b3109 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -7,24 +7,36 @@ import ( "rand" "log" "storage" + "sync" +) + +const ( + ChunkSizeLimit = 1 * 1024 * 1024 * 1024 //1G, can not be more than max(uint32)*8 ) type Machine struct { Server string //<server name/ip>[:port] PublicServer string - CanWrite bool + Volumes []storage.VolumeInfo + Capacity int } + type Mapper struct { dir string fileName string capacity int - Machines [][]Machine //initial version only support one copy per machine - writers [][]Machine // transient value to lookup writers fast + + lock sync.Mutex + Machines []*Machine + vid2machineId map[uint64]int + writers []int // transient array of writers volume id + + GlobalVolumeSequence uint64 } -func NewMachine(server, publicServer string) (m *Machine) { +func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) (m *Machine) { m = new(Machine) - m.Server, m.PublicServer = server, publicServer + m.Server, m.PublicServer, m.Volumes, m.Capacity = server, publicServer, volumes, capacity return } @@ -33,74 +45,90 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { m.dir, m.fileName, m.capacity = dirname, filename, capacity log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) - m.Machines = *new([][]Machine) - m.writers = *new([][]Machine) + m.vid2machineId = make(map[uint64]int) + m.writers = *new([]int) if e != nil { log.Println("Mapping File Read", e) + m.Machines = *new([]*Machine) } else { decoder := gob.NewDecoder(dataFile) - defer dataFile.Close() + defer dataFile.Close() decoder.Decode(&m.Machines) - for _, list := range m.Machines { - //TODO: what if a list has mixed readers and writers? Now it's treated as readonly - allCanWrite := false - for _, entry := range list { - allCanWrite = allCanWrite && entry.CanWrite - } - if allCanWrite { - m.writers = append(m.writers, list) + decoder.Decode(&m.GlobalVolumeSequence) + + //add to vid2machineId map, and writers array + for machine_index, machine := range m.Machines { + for _, v := range machine.Volumes { + m.vid2machineId[v.Id] = machine_index + if v.Size < ChunkSizeLimit { + m.writers = append(m.writers, machine_index) + } } } - log.Println("Loaded mapping size", len(m.Machines)) + log.Println("Loaded mapping size", len(m.Machines)) } return } -func (m *Mapper) PickForWrite() []Machine { - vid := rand.Intn(len(m.Machines)) - return m.Machines[vid] +func (m *Mapper) PickForWrite() *Machine { + vid := rand.Intn(len(m.writers)) + return m.Machines[m.writers[vid]] } -func (m *Mapper) Get(vid int) []Machine { +func (m *Mapper) Get(vid int) *Machine { return m.Machines[vid] } -func (m *Mapper) Add(machine Machine, volumes []storage.VolumeStat, capacity int) []int { - log.Println("Adding existing", machine.Server, len(volumes), "volumes to dir", len(m.Machines)) - log.Println("Adding new ", machine.Server, capacity - len(volumes), "volumes to dir", len(m.Machines)) - maxId := len(m.Machines)-1 - for _, v := range volumes { - if maxId < int(v.Id) { - maxId = int(v.Id) +func (m *Mapper) Add(machine Machine) []uint64 { + log.Println("Adding existing", machine.Server, len(machine.Volumes), "volumes to dir", len(m.Machines)) + log.Println("Adding new ", machine.Server, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) + //check existing machine, linearly + m.lock.Lock() + foundExistingMachineId := -1 + for index, entry := range m.Machines { + if machine.Server == entry.Server { + foundExistingMachineId = index + break } } - for i := len(m.Machines); i <= maxId; i++ { - m.Machines = append(m.Machines, nil) - } - log.Println("Machine list now is", len(m.Machines)) - for _, v := range volumes { - found := false - existing := m.Machines[v.Id] - for _, entry := range existing { - if machine.Server == entry.Server { - found = true - break - } - } - if !found { - m.Machines[v.Id] = append(existing, machine) - log.Println("Setting volume", v.Id, "to", machine.Server) - } + machineId := foundExistingMachineId + if machineId < 0 { + machineId = len(m.Machines) + m.Machines = append(m.Machines, &machine) } - vids := new([]int) - for vid,i := len(m.Machines),len(volumes); i < capacity; i,vid=i+1,vid+1 { - list := new([]Machine) - *list = append(*list, machine) - m.Machines = append(m.Machines, *list) + //generate new volumes + vids := new([]uint64) + for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 { + newVolume := *new(storage.VolumeInfo) + newVolume.Id, newVolume.Size = vid, 0 + machine.Volumes = append(machine.Volumes, newVolume) + m.vid2machineId[vid] = machineId log.Println("Adding volume", vid, "from", machine.Server) *vids = append(*vids, vid) + m.GlobalVolumeSequence = vid + 1 } m.Save() - log.Println("Dir size =>", len(m.Machines)) + m.lock.Unlock() + + //add to vid2machineId map, and writers array + for _, v := range machine.Volumes { + log.Println("Setting volume", v.Id, "to", machine.Server) + m.vid2machineId[v.Id] = machineId + if v.Size < ChunkSizeLimit { + m.writers = append(m.writers, machineId) + } + } + //setting writers, copy-on-write because of possible updating + var writers []int + for machine_index, machine_entry := range m.Machines { + for _, v := range machine_entry.Volumes { + if v.Size < ChunkSizeLimit { + writers = append(writers, machine_index) + } + } + } + m.writers = writers + + log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.writers)) return *vids } func (m *Mapper) Save() { @@ -112,4 +140,5 @@ func (m *Mapper) Save() { defer dataFile.Close() encoder := gob.NewEncoder(dataFile) encoder.Encode(m.Machines) + encoder.Encode(m.GlobalVolumeSequence) } diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index e13f47257..ff6944853 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -16,9 +16,9 @@ type Store struct { Port int PublicServer string } -type VolumeStat struct { - Id uint64 "id" - CanWrite bool +type VolumeInfo struct { + Id uint64 + Size int64 } func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { @@ -44,10 +44,10 @@ func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) ( } func (s *Store) Join(mserver string) { - stats := new([]*VolumeStat) - for k, _ := range s.volumes { - s := new(VolumeStat) - s.Id, s.CanWrite = k, true + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size = k, v.Size() *stats = append(*stats, s) } bytes, _ := json.Marshal(stats) diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go index 3acd2d0e3..4b7f6ab3a 100644 --- a/weed-fs/src/pkg/storage/volume.go +++ b/weed-fs/src/pkg/storage/volume.go @@ -38,12 +38,12 @@ func NewVolume(dirname string, id uint64) (v *Volume) { return } -func (v *Volume) CanWrite(limit int64) bool { +func (v *Volume) Size() int64 { stat, e:=v.dataFile.Stat() if e!=nil{ - return stat.Size < limit + return stat.Size } - return false + return -1 } func (v *Volume) Close() { close(v.accessChannel) |
