diff options
| author | chris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0> | 2011-12-26 09:43:17 +0000 |
|---|---|---|
| committer | chris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0> | 2011-12-26 09:43:17 +0000 |
| commit | 6913fabc640fe24b180dbbbb2bfd7193e93d71f9 (patch) | |
| tree | 09b24e73b8b3c2891001e74f2f2dce9478540335 /weed-fs/src/pkg | |
| parent | ae3a53388f75d57d3b3bef803891423418871c51 (diff) | |
| download | seaweedfs-6913fabc640fe24b180dbbbb2bfd7193e93d71f9.tar.xz seaweedfs-6913fabc640fe24b180dbbbb2bfd7193e93d71f9.zip | |
simplifying volume id mechanism, removing automatic volume id generation and discovering
periodically report machine status
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@25 282b0af5-e82d-9cf1-ede4-77906d7719d0
Diffstat (limited to 'weed-fs/src/pkg')
| -rw-r--r-- | weed-fs/src/pkg/directory/volume_mapping.go | 82 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/store.go | 79 |
2 files changed, 53 insertions, 108 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 3c10e1a78..a4cce21df 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -22,65 +22,40 @@ type MachineInfo struct { type Machine struct { Server MachineInfo Volumes []storage.VolumeInfo - Capacity int } type Mapper struct { dir string fileName string - capacity int lock sync.Mutex Machines []*Machine vid2machineId map[uint32]int Writers []int // transient array of Writers volume id - GlobalVolumeSequence uint32 - FileIdSequence uint64 fileIdCounter uint64 } -func NewMachine(server, publicServer string, volumes []storage.VolumeInfo, capacity int) *Machine { - return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes, Capacity: capacity} +func NewMachine(server, publicServer string, volumes []storage.VolumeInfo) *Machine { + return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicServer}, Volumes: volumes} } -func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { - m = &Mapper{dir: dirname, fileName: filename, capacity: capacity} - log.Println("Loading volume id to maching mapping:", path.Join(m.dir, m.fileName+".map")) - dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_RDONLY, 0644) +func NewMapper(dirname string, filename string) (m *Mapper) { + m = &Mapper{dir: dirname, fileName: filename} m.vid2machineId = make(map[uint32]int) m.Writers = *new([]int) - if e != nil { - log.Println("Mapping File Read", e) - m.Machines = *new([]*Machine) - } else { - decoder := gob.NewDecoder(dataFile) - defer dataFile.Close() - decoder.Decode(&m.Machines) - decoder.Decode(&m.GlobalVolumeSequence) - - //add to vid2machineId map, and Writers array - for machine_index, machine := range m.Machines { - for _, v := range machine.Volumes { - m.vid2machineId[v.Id] = machine_index - if v.Size < ChunkSizeLimit { - m.Writers = append(m.Writers, machine_index) - } - } - } - log.Println("Loaded mapping size", len(m.Machines)) - } + m.Machines = *new([]*Machine) seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) if se != nil { m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) + log.Println("Setting file id sequence", m.FileIdSequence) } else { decoder := gob.NewDecoder(seqFile) defer seqFile.Close() decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence + FileIdSaveInterval) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) //in case the server stops between intervals m.FileIdSequence += FileIdSaveInterval } @@ -89,22 +64,20 @@ func NewMapper(dirname string, filename string, capacity int) (m *Mapper) { func (m *Mapper) PickForWrite() (string, MachineInfo) { machine := m.Machines[m.Writers[rand.Intn(len(m.Writers))]] vid := machine.Volumes[rand.Intn(len(machine.Volumes))].Id - return NewFileId(vid,m.NextFileId(),rand.Uint32()).String(), machine.Server + return NewFileId(vid, m.NextFileId(), rand.Uint32()).String(), machine.Server } func (m *Mapper) NextFileId() uint64 { - if m.fileIdCounter <= 0 { - m.fileIdCounter = FileIdSaveInterval - m.saveSequence() - } + if m.fileIdCounter <= 0 { + m.fileIdCounter = FileIdSaveInterval + m.saveSequence() + } m.fileIdCounter-- return m.FileIdSequence - m.fileIdCounter } func (m *Mapper) Get(vid uint32) *Machine { return m.Machines[m.vid2machineId[vid]] } -func (m *Mapper) Add(machine Machine) []uint32 { - log.Println("Adding existing", machine.Server.Url, len(machine.Volumes), "volumes to dir", len(m.Machines)) - log.Println("Adding new ", machine.Server.Url, machine.Capacity-len(machine.Volumes), "volumes to dir", len(m.Machines)) +func (m *Mapper) Add(machine Machine){ //check existing machine, linearly m.lock.Lock() foundExistingMachineId := -1 @@ -119,24 +92,11 @@ func (m *Mapper) Add(machine Machine) []uint32 { machineId = len(m.Machines) m.Machines = append(m.Machines, &machine) } - - //generate new volumes - vids := new([]uint32) - for vid, i := m.GlobalVolumeSequence, len(machine.Volumes); i < machine.Capacity; i, vid = i+1, vid+1 { - newVolume := storage.VolumeInfo{Id: vid, Size: 0} - machine.Volumes = append(machine.Volumes, newVolume) - m.vid2machineId[vid] = machineId - log.Println("Adding volume", vid, "from", machine.Server.Url) - *vids = append(*vids, vid) - m.GlobalVolumeSequence = vid + 1 - } - - m.Save() m.lock.Unlock() //add to vid2machineId map, and Writers array for _, v := range machine.Volumes { - log.Println("Setting volume", v.Id, "to", machine.Server.Url) + //log.Println("Setting volume", v.Id, "to", machine.Server.Url) m.vid2machineId[v.Id] = machineId if v.Size < ChunkSizeLimit { m.Writers = append(m.Writers, machineId) @@ -152,20 +112,6 @@ func (m *Mapper) Add(machine Machine) []uint32 { } } m.Writers = Writers - - log.Println("Machines:", len(m.Machines), "Volumes:", len(m.vid2machineId), "Writable:", len(m.Writers)) - return *vids -} -func (m *Mapper) Save() { - log.Println("Saving virtual to physical:", path.Join(m.dir, m.fileName+".map")) - dataFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".map"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Mapping File Save [ERROR] %s\n", e) - } - defer dataFile.Close() - encoder := gob.NewEncoder(dataFile) - encoder.Encode(m.Machines) - encoder.Encode(m.GlobalVolumeSequence) } func (m *Mapper) saveSequence() { log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go index 14f69bf1d..f4f71e541 100644 --- a/weed-fs/src/pkg/storage/store.go +++ b/weed-fs/src/pkg/storage/store.go @@ -2,7 +2,6 @@ package storage import ( "log" - "io/ioutil" "json" "os" "strings" @@ -13,7 +12,6 @@ import ( type Store struct { volumes map[uint64]*Volume - capacity int dir string Port int PublicServer string @@ -23,35 +21,48 @@ type VolumeInfo struct { Size int64 } -func NewStore(port int, publicServer, dirname string, chunkSize, capacity int) (s *Store) { - s = &Store{Port: port, PublicServer: publicServer, dir: dirname, capacity: capacity} +func NewStore(port int, publicServer, dirname string, volumeListString string) (s *Store) { + s = &Store{Port: port, PublicServer: publicServer, dir: dirname} s.volumes = make(map[uint64]*Volume) - - files, _ := ioutil.ReadDir(dirname) - for _, f := range files { - if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { - continue - } - id, err := strconv.Atoui64(f.Name[0:(strings.LastIndex(f.Name, ".dat"))]) - log.Println("Loading data file name:", f.Name) - if err != nil { - continue + + for _, range_string := range strings.Split(volumeListString, ",") { + if strings.Index(range_string, "-") < 0 { + id_string := range_string + id, err := strconv.Atoui64(id_string) + if err != nil { + log.Println("Volume Id", id_string, "is not a valid unsigned integer! Skipping it...") + continue + } + s.volumes[id] = NewVolume(s.dir, uint32(id)) + } else { + pair := strings.Split(range_string, "-") + start, start_err := strconv.Atoui64(pair[0]) + if start_err != nil { + log.Println("Volume Id", pair[0], "is not a valid unsigned integer! Skipping it...") + continue + } + end, end_err := strconv.Atoui64(pair[1]) + if end_err != nil { + log.Println("Volume Id", pair[1], "is not a valid unsigned integer! Skipping it...") + continue + } + for id := start; id<=end; id++ { + s.volumes[id] = NewVolume(s.dir, uint32(id)) + } } - s.volumes[id] = NewVolume(s.dir, uint32(id)) } log.Println("Store started on dir:", dirname, "with", len(s.volumes), "existing volumes") - log.Println("Expected capacity=", s.capacity, "volumes") return } -func (s *Store) Status()(*[]*VolumeInfo){ - stats := new([]*VolumeInfo) - for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size = uint32(k), v.Size() - *stats = append(*stats, s) - } - return stats +func (s *Store) Status() *[]*VolumeInfo { + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size = uint32(k), v.Size() + *stats = append(*stats, s) + } + return stats } func (s *Store) Join(mserver string) { stats := new([]*VolumeInfo) @@ -65,29 +76,17 @@ func (s *Store) Join(mserver string) { values.Add("port", strconv.Itoa(s.Port)) values.Add("publicServer", s.PublicServer) values.Add("volumes", string(bytes)) - log.Println("Registering exiting volumes", string(bytes)) - values.Add("capacity", strconv.Itoa(s.capacity)) - retString := util.Post("http://"+mserver+"/dir/join", values) - if retString != nil { - newVids := new([]int) - log.Println("Instructed to create volume", string(retString)) - e := json.Unmarshal(retString, newVids) - if e == nil { - for _, vid := range *newVids { - s.volumes[uint64(vid)] = NewVolume(s.dir, uint32(vid)) - log.Println("Adding volume", vid) - } - } - } + log.Println("Exiting volumes", string(bytes)) + util.Post("http://"+mserver+"/dir/join", values) } func (s *Store) Close() { for _, v := range s.volumes { v.Close() } } -func (s *Store) Write(i uint64, n *Needle) (uint32){ +func (s *Store) Write(i uint64, n *Needle) uint32 { return s.volumes[i].write(n) } -func (s *Store) Read(i uint64, n *Needle) (int, os.Error){ +func (s *Store) Read(i uint64, n *Needle) (int, os.Error) { return s.volumes[i].read(n) } |
