aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/cmd
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-26 03:27:10 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-26 03:27:10 -0700
commit4b3676a54b3cebeb957b79dc2b74a5d474e4259a (patch)
treea6489d875114f3ebea75c221c52a8c7907f72d15 /weed-fs/src/cmd
parentb5c29e25aa76be758f3cbac8d6d8860476f75f0e (diff)
downloadseaweedfs-4b3676a54b3cebeb957b79dc2b74a5d474e4259a.tar.xz
seaweedfs-4b3676a54b3cebeb957b79dc2b74a5d474e4259a.zip
delete replications, untested yet
Diffstat (limited to 'weed-fs/src/cmd')
-rw-r--r--weed-fs/src/cmd/weed/volume.go67
1 files changed, 42 insertions, 25 deletions
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index 798fe6995..ec442716a 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -135,27 +135,11 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
ret := store.Write(volumeId, needle)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" {
- if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
- sendFunc := func(background bool) {
- postContentFunc := func(location operation.Location) bool {
- operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
- return true
- }
- for _, location := range lookupResult.Locations {
- if location.Url != (*ip+":"+strconv.Itoa(*vport)) {
- if background {
- go postContentFunc(location)
- } else {
- postContentFunc(location)
- }
- }
- }
- }
- waitTime, err := strconv.Atoi(r.FormValue("wait"))
- sendFunc(err == nil && waitTime > 0)
- } else {
- log.Println("Failed to lookup for", volumeId, lookupErr.Error())
- }
+ waitTime, err := strconv.Atoi(r.FormValue("wait"))
+ distributedOperation(volumeId, func(location operation.Location) bool {
+ operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
+ return true
+ }, err == nil && waitTime > 0)
}
w.WriteHeader(http.StatusCreated)
}
@@ -191,7 +175,19 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
n.Size = 0
- store.Delete(volumeId, n)
+ ret := store.Delete(volumeId, n)
+
+ if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
+ if r.FormValue("type") != "standard" {
+ waitTime, err := strconv.Atoi(r.FormValue("wait"))
+ distributedOperation(volumeId, func(location operation.Location) bool {
+ operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
+ return true
+ }, err == nil && waitTime > 0)
+ }
+ w.WriteHeader(http.StatusCreated)
+ }
+
m := make(map[string]uint32)
m["size"] = uint32(count)
writeJson(w, r, m)
@@ -217,6 +213,27 @@ func parseURLPath(path string) (vid, fid, ext string) {
return
}
+type distributedFunction func(location operation.Location) bool
+
+func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) {
+ if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
+ sendFunc := func(background bool) {
+ for _, location := range lookupResult.Locations {
+ if location.Url != (*ip + ":" + strconv.Itoa(*vport)) {
+ if background {
+ go op(location)
+ } else {
+ op(location)
+ }
+ }
+ }
+ }
+ sendFunc(wait)
+ } else {
+ log.Println("Failed to lookup for", volumeId, lookupErr.Error())
+ }
+}
+
func runVolume(cmd *Command, args []string) bool {
fileInfo, err := os.Stat(*volumeFolder)
//TODO: now default to 1G, this value should come from server?
@@ -228,9 +245,9 @@ func runVolume(cmd *Command, args []string) bool {
}
perm := fileInfo.Mode().Perm()
log.Println("Volume Folder permission:", perm)
-
+
if *publicUrl == "" {
- *publicUrl = *ip + ":" + strconv.Itoa(*vport)
+ *publicUrl = *ip + ":" + strconv.Itoa(*vport)
}
store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
@@ -247,7 +264,7 @@ func runVolume(cmd *Command, args []string) bool {
}()
log.Println("store joined at", *masterNode)
- log.Println("Start storage service at http://"+*ip+":"+strconv.Itoa(*vport))
+ log.Println("Start storage service at http://" + *ip + ":" + strconv.Itoa(*vport))
e := http.ListenAndServe(":"+strconv.Itoa(*vport), nil)
if e != nil {
log.Fatalf("Fail to start:%s", e.Error())