aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-20 17:58:29 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-20 17:58:29 -0700
commit5e97179d061fd885ab5df0d91c1713a5139ca112 (patch)
tree125f4d7900509d09eed08a57dc969a64cef77767
parent7d8e9f829c11dd8151144e20506e673d2f3b9d27 (diff)
downloadseaweedfs-5e97179d061fd885ab5df0d91c1713a5139ca112.tar.xz
seaweedfs-5e97179d061fd885ab5df0d91c1713a5139ca112.zip
refactoring content uploading
-rw-r--r--weed-fs/note/replication.txt3
-rw-r--r--weed-fs/src/cmd/weed/upload.go51
-rw-r--r--weed-fs/src/cmd/weed/volume.go24
-rw-r--r--weed-fs/src/pkg/operation/lookup_volume_id.go (renamed from weed-fs/src/pkg/operation/lookup.go)0
-rw-r--r--weed-fs/src/pkg/operation/upload_content.go40
-rw-r--r--weed-fs/src/pkg/storage/store.go3
6 files changed, 74 insertions, 47 deletions
diff --git a/weed-fs/note/replication.txt b/weed-fs/note/replication.txt
index 0dd73fd90..c4bf46044 100644
--- a/weed-fs/note/replication.txt
+++ b/weed-fs/note/replication.txt
@@ -82,4 +82,5 @@ For the above operations, here are the todo list:
5. accept lookup for volume locations ALREADY EXISTS /dir/lookup
6. read topology/datacenter/rack layout
-
+TODO:
+ 1. replicate content to the other server if the replication type needs replicas
diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go
index 515816921..c025c5029 100644
--- a/weed-fs/src/cmd/weed/upload.go
+++ b/weed-fs/src/cmd/weed/upload.go
@@ -1,16 +1,12 @@
package main
import (
- "bytes"
"encoding/json"
"errors"
"fmt"
- "io"
- "io/ioutil"
- "mime/multipart"
- "net/http"
"net/url"
"os"
+ "pkg/operation"
"pkg/util"
"strconv"
)
@@ -64,23 +60,10 @@ func assign(count int) (*AssignResult, error) {
return &ret, nil
}
-type UploadResult struct {
- Size int
-}
-
-func upload(filename string, uploadUrl string) (int, string) {
+func upload(filename string, server string, fid string) (int) {
if *IsDebug {
fmt.Println("Start uploading file:", filename)
}
- body_buf := bytes.NewBufferString("")
- body_writer := multipart.NewWriter(body_buf)
- file_writer, err := body_writer.CreateFormFile("file", filename)
- if err != nil {
- if *IsDebug {
- fmt.Println("Failed to create form file:", filename)
- }
- panic(err.Error())
- }
fh, err := os.Open(filename)
if err != nil {
if *IsDebug {
@@ -88,31 +71,8 @@ func upload(filename string, uploadUrl string) (int, string) {
}
panic(err.Error())
}
- io.Copy(file_writer, fh)
- content_type := body_writer.FormDataContentType()
- body_writer.Close()
- resp, err := http.Post(uploadUrl, content_type, body_buf)
- if err != nil {
- if *IsDebug {
- fmt.Println("Failed to upload file to", uploadUrl)
- }
- panic(err.Error())
- }
- defer resp.Body.Close()
- resp_body, err := ioutil.ReadAll(resp.Body)
- if *IsDebug {
- fmt.Println("Upload response:", string(resp_body))
- }
- if err != nil {
- panic(err.Error())
- }
- var ret UploadResult
- err = json.Unmarshal(resp_body, &ret)
- if err != nil {
- panic(err.Error())
- }
- //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
- return ret.Size, uploadUrl
+ ret, _ := operation.Upload(server, fid, filename, fh)
+ return ret.Size
}
type SubmitResult struct {
@@ -131,8 +91,7 @@ func submit(files []string) []SubmitResult {
if index > 0 {
fid = fid + "_" + strconv.Itoa(index)
}
- uploadUrl := "http://" + ret.PublicUrl + "/" + fid
- results[index].Size, _ = upload(file, uploadUrl)
+ results[index].Size = upload(file, ret.PublicUrl, fid)
results[index].Fid = fid
}
return results
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index 3b41a60d1..af4ec8d5e 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -137,6 +137,30 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, ne)
} else {
ret := store.Write(volumeId, needle)
+ if ret > 0 { //send to other replica locations
+ if r.FormValue("type") != "standard" {
+ waitTime, err := strconv.Atoi(r.FormValue("wait"))
+ lookupResult, lookupErr := operation.Lookup(*server, volumeId)
+ if lookupErr == nil {
+ sendFunc := func(background bool) {
+ postContentFunc := func(location operation.Location) bool{
+
+ return true
+ }
+ for _, location := range lookupResult.Locations {
+ if background {
+ go postContentFunc(location)
+ }else{
+ postContentFunc(location)
+ }
+ }
+ }
+ sendFunc(err == nil && waitTime > 0)
+ } else {
+ log.Println("Failed to lookup for", volumeId, lookupErr.Error())
+ }
+ }
+ }
m := make(map[string]uint32)
m["size"] = ret
writeJson(w, r, m)
diff --git a/weed-fs/src/pkg/operation/lookup.go b/weed-fs/src/pkg/operation/lookup_volume_id.go
index 720286c31..720286c31 100644
--- a/weed-fs/src/pkg/operation/lookup.go
+++ b/weed-fs/src/pkg/operation/lookup_volume_id.go
diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go
new file mode 100644
index 000000000..83e20bf3b
--- /dev/null
+++ b/weed-fs/src/pkg/operation/upload_content.go
@@ -0,0 +1,40 @@
+package operation
+
+import (
+ "bytes"
+ "encoding/json"
+ "mime/multipart"
+ "net/http"
+ _ "fmt"
+ "io"
+ "io/ioutil"
+)
+
+type UploadResult struct {
+ Size int
+}
+
+func Upload(server string, fid string, filename string, reader io.Reader) (*UploadResult, error) {
+ body_buf := bytes.NewBufferString("")
+ body_writer := multipart.NewWriter(body_buf)
+ file_writer, err := body_writer.CreateFormFile("file", filename)
+ io.Copy(file_writer, reader)
+ content_type := body_writer.FormDataContentType()
+ body_writer.Close()
+ resp, err := http.Post("http://"+server+"/"+fid, content_type, body_buf)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ resp_body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+ var ret UploadResult
+ err = json.Unmarshal(resp_body, &ret)
+ if err != nil {
+ panic(err.Error())
+ }
+ //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
+ return &ret, nil
+}
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index d1155bb78..4f7e66ef0 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -135,6 +135,9 @@ func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
}
return 0, errors.New("Not Found")
}
+func (s *Store) GetVolume(i VolumeId) *Volume {
+ return s.volumes[i]
+}
func (s *Store) HasVolume(i VolumeId) bool {
_, ok := s.volumes[i]