aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-26 03:27:10 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-26 03:27:10 -0700
commit4b3676a54b3cebeb957b79dc2b74a5d474e4259a (patch)
treea6489d875114f3ebea75c221c52a8c7907f72d15
parentb5c29e25aa76be758f3cbac8d6d8860476f75f0e (diff)
downloadseaweedfs-4b3676a54b3cebeb957b79dc2b74a5d474e4259a.tar.xz
seaweedfs-4b3676a54b3cebeb957b79dc2b74a5d474e4259a.zip
delete replications, untested yet
-rw-r--r--weed-fs/src/cmd/weed/volume.go67
-rw-r--r--weed-fs/src/pkg/operation/delete_content.go14
-rw-r--r--weed-fs/src/pkg/operation/upload_content.go64
3 files changed, 88 insertions, 57 deletions
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index 798fe6995..ec442716a 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -135,27 +135,11 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
ret := store.Write(volumeId, needle)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" {
- if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
- sendFunc := func(background bool) {
- postContentFunc := func(location operation.Location) bool {
- operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
- return true
- }
- for _, location := range lookupResult.Locations {
- if location.Url != (*ip+":"+strconv.Itoa(*vport)) {
- if background {
- go postContentFunc(location)
- } else {
- postContentFunc(location)
- }
- }
- }
- }
- waitTime, err := strconv.Atoi(r.FormValue("wait"))
- sendFunc(err == nil && waitTime > 0)
- } else {
- log.Println("Failed to lookup for", volumeId, lookupErr.Error())
- }
+ waitTime, err := strconv.Atoi(r.FormValue("wait"))
+ distributedOperation(volumeId, func(location operation.Location) bool {
+ operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
+ return true
+ }, err == nil && waitTime > 0)
}
w.WriteHeader(http.StatusCreated)
}
@@ -191,7 +175,19 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
n.Size = 0
- store.Delete(volumeId, n)
+ ret := store.Delete(volumeId, n)
+
+ if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
+ if r.FormValue("type") != "standard" {
+ waitTime, err := strconv.Atoi(r.FormValue("wait"))
+ distributedOperation(volumeId, func(location operation.Location) bool {
+ operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
+ return true
+ }, err == nil && waitTime > 0)
+ }
+ w.WriteHeader(http.StatusCreated)
+ }
+
m := make(map[string]uint32)
m["size"] = uint32(count)
writeJson(w, r, m)
@@ -217,6 +213,27 @@ func parseURLPath(path string) (vid, fid, ext string) {
return
}
+type distributedFunction func(location operation.Location) bool
+
+func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) {
+ if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
+ sendFunc := func(background bool) {
+ for _, location := range lookupResult.Locations {
+ if location.Url != (*ip + ":" + strconv.Itoa(*vport)) {
+ if background {
+ go op(location)
+ } else {
+ op(location)
+ }
+ }
+ }
+ }
+ sendFunc(wait)
+ } else {
+ log.Println("Failed to lookup for", volumeId, lookupErr.Error())
+ }
+}
+
func runVolume(cmd *Command, args []string) bool {
fileInfo, err := os.Stat(*volumeFolder)
//TODO: now default to 1G, this value should come from server?
@@ -228,9 +245,9 @@ func runVolume(cmd *Command, args []string) bool {
}
perm := fileInfo.Mode().Perm()
log.Println("Volume Folder permission:", perm)
-
+
if *publicUrl == "" {
- *publicUrl = *ip + ":" + strconv.Itoa(*vport)
+ *publicUrl = *ip + ":" + strconv.Itoa(*vport)
}
store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
@@ -247,7 +264,7 @@ func runVolume(cmd *Command, args []string) bool {
}()
log.Println("store joined at", *masterNode)
- log.Println("Start storage service at http://"+*ip+":"+strconv.Itoa(*vport))
+ log.Println("Start storage service at http://" + *ip + ":" + strconv.Itoa(*vport))
e := http.ListenAndServe(":"+strconv.Itoa(*vport), nil)
if e != nil {
log.Fatalf("Fail to start:%s", e.Error())
diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go
new file mode 100644
index 000000000..66a61f55c
--- /dev/null
+++ b/weed-fs/src/pkg/operation/delete_content.go
@@ -0,0 +1,14 @@
+package operation
+
+import (
+ "net/http"
+)
+
+func Delete(url string) error {
+ req, err := http.NewRequest("DELETE", url, nil)
+ if err != nil {
+ return err
+ }
+ _, err = http.DefaultClient.Do(req)
+ return err
+}
diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go
index e7822d387..ce2e5af68 100644
--- a/weed-fs/src/pkg/operation/upload_content.go
+++ b/weed-fs/src/pkg/operation/upload_content.go
@@ -1,42 +1,42 @@
package operation
import (
- "bytes"
- "encoding/json"
- "mime/multipart"
- "net/http"
- _ "fmt"
- "io"
- "io/ioutil"
+ "bytes"
+ "encoding/json"
+ _ "fmt"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
)
type UploadResult struct {
- Size int
+ Size int
}
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
- println("uploading to", uploadUrl)
- body_buf := bytes.NewBufferString("")
- body_writer := multipart.NewWriter(body_buf)
- file_writer, err := body_writer.CreateFormFile("file", filename)
- io.Copy(file_writer, reader)
- content_type := body_writer.FormDataContentType()
- body_writer.Close()
- resp, err := http.Post(uploadUrl, content_type, body_buf)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- resp_body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var ret UploadResult
- println("upload response to", uploadUrl, resp_body)
- err = json.Unmarshal(resp_body, &ret)
- if err != nil {
- panic(err.Error())
- }
- //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
- return &ret, nil
+ body_buf := bytes.NewBufferString("")
+ body_writer := multipart.NewWriter(body_buf)
+ file_writer, err := body_writer.CreateFormFile("file", filename)
+ io.Copy(file_writer, reader)
+ content_type := body_writer.FormDataContentType()
+ body_writer.Close()
+ resp, err := http.Post(uploadUrl, content_type, body_buf)
+ if err != nil {
+ println("uploading to", uploadUrl)
+ return nil, err
+ }
+ defer resp.Body.Close()
+ resp_body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+ var ret UploadResult
+ err = json.Unmarshal(resp_body, &ret)
+ if err != nil {
+ println("upload response to", uploadUrl, resp_body)
+ panic(err.Error())
+ }
+ //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
+ return &ret, nil
}