diff options
| author | chris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0> | 2011-12-16 14:51:26 +0000 |
|---|---|---|
| committer | chris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0> | 2011-12-16 14:51:26 +0000 |
| commit | 23ecd7bb33e8c22b07f630fb963cbdc5c072adad (patch) | |
| tree | 6408c31e48fde5cb2dcf34266504dff3a014388f /weed-fs/src/pkg | |
| parent | 02f4e7b82c5763efb6932bcc3f97dc549b66e4a9 (diff) | |
| download | seaweedfs-23ecd7bb33e8c22b07f630fb963cbdc5c072adad.tar.xz seaweedfs-23ecd7bb33e8c22b07f630fb963cbdc5c072adad.zip | |
split into server and clients
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@9 282b0af5-e82d-9cf1-ede4-77906d7719d0
Diffstat (limited to 'weed-fs/src/pkg')
| -rw-r--r-- | weed-fs/src/pkg/directory/volume_mapping.go | 17 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/needle.go | 67 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/needle_map.go | 53 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/store.go | 69 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/util.go | 50 | ||||
| -rw-r--r-- | weed-fs/src/pkg/storage/volume.go | 66 |
6 files changed, 313 insertions, 9 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 95ec57d2e..459d7f2b1 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -6,13 +6,19 @@ import ( "path" "rand" "log" - "store" + "storage" ) type Machine struct { Server string //<server name/ip>[:port] PublicServer string } +type Mapper struct { + dir string + FileName string + Id2Machine map[uint32][]*Machine + LastId uint32 +} func NewMachine(server, publicServer string) (m *Machine) { m = new(Machine) @@ -20,13 +26,6 @@ func NewMachine(server, publicServer string) (m *Machine) { return } -type Mapper struct { - dir string - FileName string - Id2Machine map[uint32][]*Machine - LastId uint32 -} - func NewMapper(dirname string, filename string) (m *Mapper) { m = new(Mapper) m.dir = dirname @@ -51,7 +50,7 @@ func (m *Mapper) PickForWrite() []*Machine { func (m *Mapper) Get(vid uint32) []*Machine { return m.Id2Machine[vid] } -func (m *Mapper) Add(machine *Machine, volumes []store.VolumeStat) { +func (m *Mapper) Add(machine *Machine, volumes []storage.VolumeStat) { log.Println("Adding store node", machine.Server) for _, v := range volumes { existing := m.Id2Machine[uint32(v.Id)] diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go new file mode 100644 index 000000000..7fde63953 --- /dev/null +++ b/weed-fs/src/pkg/storage/needle.go @@ -0,0 +1,67 @@ +package storage + +import ( + "io" + "io/ioutil" + "http" + "log" + "strconv" + "strings" +) + +type Needle struct{ + Cookie uint8 "random number to mitigate brute force lookups" + Key uint64 "file id" + AlternateKey uint32 "supplemental id" + Size uint32 "Data size" + Data []byte "The actual file data" + Checksum int32 "CRC32 to check integrity" + Padding []byte "Aligned to 8 bytes" +} +func NewNeedle(r *http.Request)(n *Needle){ + n = new(Needle) + form,fe:=r.MultipartReader() + if fe!=nil { + log.Fatalf("MultipartReader [ERROR] %s\n", fe) + } + part,_:=form.NextPart() + data,_:=ioutil.ReadAll(part) + n.Data = data + + n.ParsePath(r.URL.Path[1:strings.LastIndex(r.URL.Path,".")]) + + return +} +func (n *Needle) ParsePath(path string){ + a := strings.Split(path,"_") + log.Println("cookie",a[0],"key",a[1],"altKey",a[2]) + cookie,_ := strconv.Atoi(a[0]) + n.Cookie = uint8(cookie) + n.Key,_ = strconv.Atoui64(a[1]) + altKey,_ := strconv.Atoui64(a[2]) + n.AlternateKey = uint32(altKey) +} +func (n *Needle) Append(w io.Writer){ + header := make([]byte,17) + header[0] = n.Cookie + uint64toBytes(header[1:9],n.Key) + uint32toBytes(header[9:13],n.AlternateKey) + n.Size = uint32(len(n.Data)) + uint32toBytes(header[13:17],n.Size) + w.Write(header) + w.Write(n.Data) + rest := 8-((n.Size+17+4)%8) + uint32toBytes(header[0:4],uint32(n.Checksum)) + w.Write(header[0:rest+4]) +} +func (n *Needle) Read(r io.Reader, size uint32){ + bytes := make([]byte,size+17+4) + r.Read(bytes) + n.Cookie = bytes[0] + n.Key = bytesToUint64(bytes[1:9]) + n.AlternateKey = bytesToUint32(bytes[9:13]) + n.Size = bytesToUint32(bytes[13:17]) + n.Data = bytes[17:17+size] + n.Checksum = int32(bytesToUint32(bytes[17+size:17+size+4])) +} + diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go new file mode 100644 index 000000000..9669536ac --- /dev/null +++ b/weed-fs/src/pkg/storage/needle_map.go @@ -0,0 +1,53 @@ +package storage + +import ( + "os" +) + +type NeedleKey struct{ + Key uint64 "file id" + AlternateKey uint32 "supplemental id" +} +func (k *NeedleKey) String() string { + var tmp [12]byte + for i :=uint(0);i<8;i++{ + tmp[i] = byte(k.Key >> (8*i)); + } + for i :=uint(0);i<4;i++{ + tmp[i+8] = byte(k.AlternateKey >> (8*i)); + } + return string(tmp[:]) +} + +type NeedleValue struct{ + Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 "Size of the data portion" +} + +type NeedleMap struct{ + m map[string]*NeedleValue //mapping NeedleKey(Key,AlternateKey) to NeedleValue +} +func NewNeedleMap() (nm *NeedleMap){ + nm = new(NeedleMap) + nm.m = make(map[string]*NeedleValue) + return +} +func (nm *NeedleMap) load(file *os.File){ +} +func makeKey(key uint64, altKey uint32) string { + var tmp [12]byte + for i :=uint(0);i<8;i++{ + tmp[i] = byte(key >> (8*i)); + } + for i :=uint(0);i<4;i++{ + tmp[i+8] = byte(altKey >> (8*i)); + } + return string(tmp[:]) +} +func (nm *NeedleMap) put(key uint64, altKey uint32, offset uint32, size uint32){ + nm.m[makeKey(key,altKey)] = &NeedleValue{Offset:offset, Size:size} +} +func (nm *NeedleMap) get(key uint64, altKey uint32) (element *NeedleValue, ok bool){ + element, ok = nm.m[makeKey(key,altKey)] + return +} diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go new file mode 100644 index 000000000..201f50528 --- /dev/null +++ b/weed-fs/src/pkg/storage/store.go @@ -0,0 +1,69 @@ +package storage + +import ( + "log" + "io/ioutil" + "json" + "strings" + "strconv" + "url" +) + +type Store struct { + volumes map[uint64]*Volume + dir string + Port int + PublicServer string +} +type VolumeStat struct { + Id uint64 "id" + Status int "status" //0:read, 1:write +} + +func NewStore(port int, publicServer, dirname string) (s *Store) { + s = new(Store) + s.Port, s.PublicServer, s.dir = port, publicServer, dirname + s.volumes = make(map[uint64]*Volume) + + counter := uint64(0) + files, _ := ioutil.ReadDir(dirname) + for _, f := range files { + if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") { + continue + } + id, err := strconv.Atoui64(f.Name[:-4]) + if err == nil { + continue + } + s.volumes[counter] = NewVolume(s.dir, id) + counter++ + } + log.Println("Store started on dir:", dirname, "with", counter, "existing volumes") + return +} + +func (s *Store) Join(mserver string) { + stats := make([]*VolumeStat, len(s.volumes)) + for k, _ := range s.volumes { + s := new(VolumeStat) + s.Id, s.Status = k, 1 + stats = append(stats, s) + } + bytes, _ := json.Marshal(stats) + values := make(url.Values) + values.Add("port", strconv.Itoa(s.Port)) + values.Add("publicServer", s.PublicServer) + values.Add("volumes", string(bytes)) + post("http://"+mserver+"/dir/join", values) +} +func (s *Store) Close() { + for _, v := range s.volumes { + v.Close() + } +} +func (s *Store) Write(i uint64, n *Needle) { + s.volumes[i].write(n) +} +func (s *Store) Read(i uint64, n *Needle) { + s.volumes[i].read(n) +} diff --git a/weed-fs/src/pkg/storage/util.go b/weed-fs/src/pkg/storage/util.go new file mode 100644 index 000000000..3315dbe01 --- /dev/null +++ b/weed-fs/src/pkg/storage/util.go @@ -0,0 +1,50 @@ +package storage + +import ( + "http" + "io/ioutil" + "url" + "log" +) + +func bytesToUint64(b []byte)(v uint64){ + for i :=uint(7);i>0;i-- { + v += uint64(b[i]) + v <<= 8 + } + v+=uint64(b[0]) + return +} +func bytesToUint32(b []byte)(v uint32){ + for i :=uint(3);i>0;i-- { + v += uint32(b[i]) + v <<= 8 + } + v+=uint32(b[0]) + return +} +func uint64toBytes(b []byte, v uint64){ + for i :=uint(0);i<8;i++ { + b[i] = byte(v>>(i*8)) + } +} +func uint32toBytes(b []byte, v uint32){ + for i :=uint(0);i<4;i++ { + b[i] = byte(v>>(i*8)) + } +} + +func post(url string, values url.Values)string{ + r, err := http.PostForm(url, values) + if err != nil { + log.Println("post:", err) + return "" + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("post:", err) + return "" + } + return string(b) +}
\ No newline at end of file diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go new file mode 100644 index 000000000..6b6db1ba2 --- /dev/null +++ b/weed-fs/src/pkg/storage/volume.go @@ -0,0 +1,66 @@ +package storage + +import ( + "os" + "path" + "strconv" + "log" +) + +type Volume struct { + Id uint64 + dir string + dataFile, indexFile *os.File + nm *NeedleMap + + accessChannel chan int +} + +func NewVolume(dirname string, id uint64) (v *Volume) { + var e os.Error + v = new(Volume) + v.dir = dirname + v.Id = id + fileName := strconv.Uitoa64(v.Id) + log.Println("file", v.dir, "/", fileName) + v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + log.Fatalf("New Volume [ERROR] %s\n", e) + } + v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + log.Fatalf("New Volume [ERROR] %s\n", e) + } + v.nm = NewNeedleMap() + v.nm.load(v.indexFile) + + v.accessChannel = make(chan int, 1) + v.accessChannel <- 0 + + return +} +func (v *Volume) Close() { + close(v.accessChannel) + v.dataFile.Close() + v.indexFile.Close() +} + +func (v *Volume) write(n *Needle) { + counter := <-v.accessChannel + offset, _ := v.dataFile.Seek(0, 2) + n.Append(v.dataFile) + nv, ok := v.nm.get(n.Key, n.AlternateKey) + if !ok || int64(nv.Offset)*8 < offset { + v.nm.put(n.Key, n.AlternateKey, uint32(offset/8), n.Size) + } + v.accessChannel <- counter + 1 +} +func (v *Volume) read(n *Needle) { + counter := <-v.accessChannel + nv, ok := v.nm.get(n.Key, n.AlternateKey) + if ok && nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*8, 0) + n.Read(v.dataFile, nv.Size) + } + v.accessChannel <- counter + 1 +} |
