aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/pkg
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-04 00:26:38 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-04 00:26:38 -0700
commita74f6cf59315ffddc6f94410b33f08eb1807170f (patch)
tree20a47285c482fa8903a436e3d9dc9650412e86e7 /weed-fs/src/pkg
parent85a1b419c071d618c9cdfbe75cc8d0aa394db53b (diff)
downloadseaweedfs-a74f6cf59315ffddc6f94410b33f08eb1807170f.tar.xz
seaweedfs-a74f6cf59315ffddc6f94410b33f08eb1807170f.zip
change to vid~machines mapping
Diffstat (limited to 'weed-fs/src/pkg')
-rw-r--r--weed-fs/src/pkg/directory/volume_mapping.go62
1 files changed, 45 insertions, 17 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go
index 6b52268f7..fd1df6e39 100644
--- a/weed-fs/src/pkg/directory/volume_mapping.go
+++ b/weed-fs/src/pkg/directory/volume_mapping.go
@@ -18,10 +18,10 @@ type Machine struct {
}
type Mapper struct {
- Machines map[string]*Machine
- vid2machine map[storage.VolumeId]*Machine
- Writers []storage.VolumeId // transient array of Writers volume id
- pulse int64
+ Machines map[string]*Machine
+ vid2machines map[storage.VolumeId][]*Machine
+ Writers []storage.VolumeId // transient array of Writers volume id
+ pulse int64
volumeSizeLimit uint64
@@ -34,7 +34,7 @@ func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo, lastSeen
func NewMapper(dirname string, filename string, volumeSizeLimit uint64, pulse int) (m *Mapper) {
m = &Mapper{}
- m.vid2machine = make(map[storage.VolumeId]*Machine)
+ m.vid2machines = make(map[storage.VolumeId][]*Machine)
m.volumeSizeLimit = volumeSizeLimit
m.Writers = *new([]storage.VolumeId)
m.Machines = make(map[string]*Machine)
@@ -51,37 +51,65 @@ func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) {
return "", 0, nil, errors.New("No more writable volumes!")
}
vid := m.Writers[rand.Intn(len_writers)]
- machine := m.vid2machine[vid]
- if machine != nil {
+ machines := m.vid2machines[vid]
+ if machines != nil && len(machines)>0 {
fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1))
if count == 0 {
return "", 0, nil, errors.New("Strange count:" + c)
}
- return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine, nil
+ //always use the first server to write
+ return NewFileId(vid, fileId, rand.Uint32()).String(), count, machines[0], nil
}
return "", 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
-func (m *Mapper) Get(vid storage.VolumeId) (*Machine, error) {
- machine := m.vid2machine[vid]
- if machine == nil {
+func (m *Mapper) Get(vid storage.VolumeId) ([]*Machine, error) {
+ machines := m.vid2machines[vid]
+ if machines == nil {
return nil, errors.New("invalid volume id " + vid.String())
}
- return machine, nil
+ return machines, nil
}
func (m *Mapper) Add(machine *Machine) {
m.Machines[machine.Url] = machine
//add to vid2machine map, and Writers array
for _, v := range machine.Volumes {
- m.vid2machine[v.Id] = machine
+ list := m.vid2machines[v.Id]
+ found := false
+ for index, entry := range list {
+ if machine.Url == entry.Url {
+ list[index] = machine
+ found = true
+ }
+ }
+ if !found {
+ m.vid2machines[v.Id] = append(m.vid2machines[v.Id], machine)
+ }
}
m.refreshWritableVolumes()
}
func (m *Mapper) remove(machine *Machine) {
- delete(m.Machines,machine.Url)
- for _, v := range machine.Volumes {
- delete(m.vid2machine,v.Id)
- }
+ delete(m.Machines, machine.Url)
+ for _, v := range machine.Volumes {
+ list := m.vid2machines[v.Id]
+ foundIndex := -1
+ for index, entry := range list {
+ if machine.Url == entry.Url {
+ foundIndex = index
+ }
+ }
+ m.vid2machines[v.Id] = deleteFromSlice(foundIndex,m.vid2machines[v.Id])
+ }
+}
+func deleteFromSlice(i int, slice []*Machine) []*Machine{
+ switch i {
+ case -1://do nothing
+ case 0: slice = slice[1:]
+ case len(slice)-1: slice = slice[:len(slice)-1]
+ default: slice = append(slice[:i], slice[i+1:]...)
+ }
+ return slice
}
+
func (m *Mapper) StartRefreshWritableVolumes() {
go func() {
for {