aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0>2011-12-11 08:42:21 +0000
committerchris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0>2011-12-11 08:42:21 +0000
commitda97f864472c6ed49b77791271b0dae0d66645fa (patch)
tree47d3f59cd8037e11467b7da3542bd7ca0e122d4e
parent8fade18e40cef62cc7305bb6924f838f3c5c1b90 (diff)
downloadseaweedfs-da97f864472c6ed49b77791271b0dae0d66645fa.tar.xz
seaweedfs-da97f864472c6ed49b77791271b0dae0d66645fa.zip
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@4 282b0af5-e82d-9cf1-ede4-77906d7719d0
-rw-r--r--weed-fs/note/weedfs.txt46
-rw-r--r--weed-fs/src/cmd/gdir.go18
-rw-r--r--weed-fs/src/cmd/gstore.go57
-rw-r--r--weed-fs/src/pkg/directory/volume_mapping.go18
-rw-r--r--weed-fs/src/pkg/store/store.go90
-rw-r--r--weed-fs/src/pkg/store/util.go22
-rw-r--r--weed-fs/src/pkg/store/volume.go15
7 files changed, 199 insertions, 67 deletions
diff --git a/weed-fs/note/weedfs.txt b/weed-fs/note/weedfs.txt
new file mode 100644
index 000000000..d07ad9484
--- /dev/null
+++ b/weed-fs/note/weedfs.txt
@@ -0,0 +1,46 @@
+How to submit a content
+1. Find physical volumes
+1.c Create a hash value
+1.d find a write logic volume id, and return [logic volume id, {physical volume ids}]
+2. submit to physical volumes
+2.c
+ generate the cookie
+ generate a unique id as key
+ choose the right altKey
+ send bytes to physical volumes
+2.s each
+ save bytes
+ store map[key uint64, altKey uint32]<offset, size>
+ for updated entry, set old entry's offset to zero
+3.c
+ wait for all physical volumes to finish
+ store the /<logic volume id>/<key>_<cookie>_<altKey>.<ext>
+
+How to retrieve a content
+1.c
+ send logic volume id
+1.d
+ find least busy volume's id
+2.c
+ send URI /<physical volume id>/<key>_<cookie>_<altKey>.<ext>
+
+
+How to submit a content
+1. send bytes to weedfs, got <volume id, key uint64, cookie code>
+ store <key uint64, volume id uint32, cookie code uint32, ext>, and other information
+
+To read a content
+2. use logic volume id to lookup a <machine id>
+ render url as /<machine id>/<volume id>/<key>/<cookie>.ext
+
+The directory server
+0.init
+ load and collect <logic volume id, machine ids> mapping
+1.on submit content
+ find a free logic volume id, start sending content to 3 machines
+ if all of them finishes, return <logic volume id, key, cookie code>
+2.on read content
+ based on logic volume id, pick a machine with less load,
+ return <machine id>
+
+
diff --git a/weed-fs/src/cmd/gdir.go b/weed-fs/src/cmd/gdir.go
deleted file mode 100644
index 34c5733be..000000000
--- a/weed-fs/src/cmd/gdir.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package main
-
-import (
- "directory"
- // "runtime"
- "log"
-)
-
-func main() {
- m := directory.NewMapper("/tmp", "directory")
- log.Println("map size", len(m.Virtual2physical))
- m.Add(10, 11,12,13)
- m.Add(20, 21,22,23)
- log.Println("map(10)", m.Get(10))
- log.Println("map size", len(m.Virtual2physical))
- m.Save()
- defer m.Save()
-}
diff --git a/weed-fs/src/cmd/gstore.go b/weed-fs/src/cmd/gstore.go
index 8bd240902..a86e30373 100644
--- a/weed-fs/src/cmd/gstore.go
+++ b/weed-fs/src/cmd/gstore.go
@@ -6,9 +6,11 @@ import (
"flag"
"fmt"
"http"
+ "json"
"log"
"mime"
"os"
+ "rand"
"strconv"
"strings"
)
@@ -18,6 +20,10 @@ var (
chunkFolder = flag.String("dir", "/tmp", "data directory to store files")
chunkCount = flag.Int("chunks", 5, "data chunks to store files")
chunkEnabled = flag.Bool("data", false, "act as a store server")
+ chunkServer = flag.String("cserver", "localhost:8080", "chunk server to store data")
+ publicServer = flag.String("pserver", "localhost:8080", "public server to serve data read")
+ metaServer = flag.String("mserver", "localhost:8080", "metadata server to store mappings")
+
metaEnabled = flag.Bool("meta", false, "act as a directory server")
metaFolder = flag.String("mdir", "/tmp", "data directory to store mappings")
)
@@ -41,7 +47,7 @@ func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) {
n := new(store.Needle)
path := r.URL.Path
sepIndex := strings.Index(path[1:], "/") + 1
- volumeId, _ := strconv.Atoi(path[1:sepIndex])
+ volumeId, _ := strconv.Atoui64(path[1:sepIndex])
dotIndex := strings.LastIndex(path, ".")
n.ParsePath(path[sepIndex+1 : dotIndex])
ext := path[dotIndex:]
@@ -51,14 +57,47 @@ func (s *Haystack) GetHandler(w http.ResponseWriter, r *http.Request) {
w.Write(n.Data)
}
func (s *Haystack) PostHandler(w http.ResponseWriter, r *http.Request) {
- volumeId := s.store.Write(store.NewNeedle(r))
+ volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
+ s.store.Write(volumeId, store.NewNeedle(r))
w.Header().Set("Content-Type", "text/plain")
fmt.Fprint(w, "volumeId=", volumeId, "\n")
}
func (s *Haystack) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
-func directoryHandler(w http.ResponseWriter, r *http.Request) {
+func dirReadHandler(w http.ResponseWriter, r *http.Request) {
+ volumeId, _ := strconv.Atoui64(r.FormValue("volumeId"))
+ machineList := server.directory.Get((uint32)(volumeId))
+ x := rand.Intn(len(machineList))
+ machine := machineList[x]
+ bytes, _ := json.Marshal(machine)
+ callback := r.FormValue("callback")
+ w.Header().Set("Content-Type", "application/javascript")
+ if callback == "" {
+ w.Write(bytes)
+ } else {
+ w.Write([]uint8(callback))
+ w.Write([]uint8("("))
+ w.Write(bytes)
+ w.Write([]uint8(")"))
+ }
+}
+func dirWriteHandler(w http.ResponseWriter, r *http.Request) {
+ machineList := server.directory.PickForWrite()
+ bytes, _ := json.Marshal(machineList)
+ callback := r.FormValue("callback")
+ w.Header().Set("Content-Type", "application/javascript")
+ if callback == "" {
+ w.Write(bytes)
+ } else {
+ w.Write([]uint8(callback))
+ w.Write([]uint8("("))
+ w.Write(bytes)
+ w.Write([]uint8(")"))
+ }
+}
+func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
+
}
var server *Haystack
@@ -66,22 +105,24 @@ var server *Haystack
func main() {
flag.Parse()
if !*chunkEnabled && !*metaEnabled {
- fmt.Fprintf(os.Stderr, "Need to act as either a store server or a directory server, or both\n")
- flag.PrintDefaults()
- os.Exit(-1)
+ fmt.Fprintf(os.Stdout, "Act as both a store server and a directory server\n")
}
server = new(Haystack)
if *chunkEnabled {
fmt.Fprintf(os.Stdout, "Chunk data stored in %s\n", *chunkFolder)
- server.store = store.NewStore(*chunkFolder, *chunkCount)
+ server.store = store.NewStore(*chunkServer, *publicServer, *chunkFolder)
defer server.store.Close()
http.HandleFunc("/", storeHandler)
}
if *metaEnabled {
server.directory = directory.NewMapper(*metaFolder, "directory")
defer server.directory.Save()
- http.HandleFunc("/directory", directoryHandler)
+ http.HandleFunc("/dir/read", dirReadHandler)
+ http.HandleFunc("/dir/write", dirWriteHandler)
+ http.HandleFunc("/dir/join", dirJoinHandler)
}
+
+ server.store.Join(*metaServer)
log.Println("Serving at http://127.0.0.1:" + strconv.Itoa(*port))
http.ListenAndServe(":"+strconv.Itoa(*port), nil)
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go
index 60fef7d7d..963145153 100644
--- a/weed-fs/src/pkg/directory/volume_mapping.go
+++ b/weed-fs/src/pkg/directory/volume_mapping.go
@@ -3,13 +3,17 @@ package directory
import (
"gob"
"os"
+ "rand"
"log"
)
+type Machine struct {
+ Server string //<server name/ip>[:port]
+}
type Mapper struct {
dir string
fileName string
- Virtual2physical map[uint32][]uint32
+ Virtual2physical map[uint32][]Machine
}
func NewMapper(dirname string, filename string) (m *Mapper) {
@@ -21,17 +25,21 @@ func NewMapper(dirname string, filename string) (m *Mapper) {
if e != nil {
log.Fatalf("Mapping File Read [ERROR] %s\n", e)
} else {
- m.Virtual2physical = make(map[uint32][]uint32)
+ m.Virtual2physical = make(map[uint32][]Machine)
decoder := gob.NewDecoder(dataFile)
decoder.Decode(m.Virtual2physical)
dataFile.Close()
}
return
}
-func (m *Mapper) Get(vid uint32) []uint32 {
+func (m *Mapper) PickForWrite() []Machine {
+ vid := uint32(rand.Intn(len(m.Virtual2physical)))
+ return m.Virtual2physical[vid]
+}
+func (m *Mapper) Get(vid uint32) []Machine {
return m.Virtual2physical[vid]
}
-func (m *Mapper) Add(vid uint32, pids ...uint32) {
+func (m *Mapper) Add(vid uint32, pids ...Machine) {
m.Virtual2physical[vid] = append(m.Virtual2physical[vid], pids...)
}
func (m *Mapper) Save() {
@@ -41,7 +49,7 @@ func (m *Mapper) Save() {
log.Fatalf("Mapping File Save [ERROR] %s\n", e)
}
defer dataFile.Close()
- m.Virtual2physical = make(map[uint32][]uint32)
+ m.Virtual2physical = make(map[uint32][]Machine)
encoder := gob.NewEncoder(dataFile)
encoder.Encode(m.Virtual2physical)
}
diff --git a/weed-fs/src/pkg/store/store.go b/weed-fs/src/pkg/store/store.go
index a568fb199..c287aaaa8 100644
--- a/weed-fs/src/pkg/store/store.go
+++ b/weed-fs/src/pkg/store/store.go
@@ -1,39 +1,69 @@
package store
import (
- "log"
- "strconv"
+ "log"
+ "io/ioutil"
+ "json"
+ "strings"
+ "strconv"
+ "url"
)
-type Store struct{
- volumes []*Volume
- dir string
-
- freeVolumeChannel chan int
+
+type Store struct {
+ volumes map[uint64]*Volume
+ dir string
+ Server string
+ PublicServer string
+}
+type VolumeStat struct {
+ Id uint64 "id"
+ Status int "status" //0:read, 1:write
+}
+
+func NewStore(server, publicServer, dirname string) (s *Store) {
+ s = new(Store)
+ s.Server, s.PublicServer, s.dir = server, publicServer, dirname
+ s.volumes = make(map[uint64]*Volume)
+
+ counter := uint64(0)
+ files, _ := ioutil.ReadDir(dirname)
+ for _, f := range files {
+ if f.IsDirectory() || !strings.HasSuffix(f.Name, ".dat") {
+ continue
+ }
+ id, err := strconv.Atoui64(f.Name[:-4])
+ if err == nil {
+ continue
+ }
+ s.volumes[counter] = NewVolume(s.dir, id)
+ counter++
+ }
+ log.Println("Store started on dir:", dirname, "with", counter, "existing volumes")
+ return
}
-func NewStore(dirname string, count int) (s *Store){
- s = new(Store)
- s.dir = dirname
- s.volumes = make([]*Volume,count)
- s.freeVolumeChannel = make(chan int, count)
- for i:=0;i<count;i++{
- s.volumes[i] = NewVolume(s.dir, strconv.Itob(i,16))
- s.freeVolumeChannel <- i
- }
- log.Println("Store started on dir:", dirname, "with", count,"volumes");
- return
+
+func (s *Store) Join(mserver string) {
+ stats := make([]*VolumeStat, len(s.volumes))
+ for k, _ := range s.volumes {
+ s := new(VolumeStat)
+ s.Id, s.Status = k, 1
+ stats = append(stats, s)
+ }
+ bytes, _ := json.Marshal(stats)
+ values := new(url.Values)
+ values.Add("server", s.Server)
+ values.Add("publicServer", s.PublicServer)
+ values.Add("volumes", string(bytes))
+ post("http://"+mserver+"/join", *values)
}
-func (s *Store)Close(){
- close(s.freeVolumeChannel)
- for _, v := range s.volumes{
- v.Close()
- }
+func (s *Store) Close() {
+ for _, v := range s.volumes {
+ v.Close()
+ }
}
-func (s *Store)Write(n *Needle)(int){
- i := <- s.freeVolumeChannel
- s.volumes[i].write(n)
- s.freeVolumeChannel <- i
- return i
+func (s *Store) Write(i uint64, n *Needle) {
+ s.volumes[i].write(n)
}
-func (s *Store)Read(i int, n *Needle){
- s.volumes[i].read(n)
+func (s *Store) Read(i uint64, n *Needle) {
+ s.volumes[i].read(n)
}
diff --git a/weed-fs/src/pkg/store/util.go b/weed-fs/src/pkg/store/util.go
index 258dec482..41b129683 100644
--- a/weed-fs/src/pkg/store/util.go
+++ b/weed-fs/src/pkg/store/util.go
@@ -1,5 +1,12 @@
package store
+import (
+ "http"
+ "io/ioutil"
+ "url"
+ "log"
+)
+
func bytesToUint64(b []byte)(v uint64){
for i :=uint(7);i>0;i-- {
v += uint64(b[i])
@@ -26,3 +33,18 @@ func uint32toBytes(b []byte, v uint32){
b[i] = byte(v>>(i*8))
}
}
+
+func post(url string, values url.Values)string{
+ r, err := http.PostForm(url, values)
+ if err != nil {
+ log.Println("post:", err)
+ return ""
+ }
+ defer r.Body.Close()
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ log.Println("post:", err)
+ return ""
+ }
+ return string(b)
+} \ No newline at end of file
diff --git a/weed-fs/src/pkg/store/volume.go b/weed-fs/src/pkg/store/volume.go
index 33b010e62..c32f4af10 100644
--- a/weed-fs/src/pkg/store/volume.go
+++ b/weed-fs/src/pkg/store/volume.go
@@ -2,29 +2,32 @@ package store
import (
"os"
+ "path"
+ "strconv"
"log"
)
type Volume struct {
+ Id uint64
dir string
- fileName string
dataFile, indexFile *os.File
nm *NeedleMap
accessChannel chan int
}
-func NewVolume(dirname string, filename string) (v *Volume) {
+func NewVolume(dirname string, id uint64) (v *Volume) {
var e os.Error
v = new(Volume)
v.dir = dirname
- v.fileName = filename
- log.Println("file", v.dir, "/", v.fileName)
- v.dataFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ v.Id = id
+ fileName := strconv.Uitoa64(v.Id)
+ log.Println("file", v.dir, "/", fileName)
+ v.dataFile, e = os.OpenFile(path.Join(v.dir,fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e)
}
- v.indexFile, e = os.OpenFile(v.dir+string(os.PathSeparator)+v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
+ v.indexFile, e = os.OpenFile(path.Join(v.dir,fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e)
}