diff options
| author | Chris Lu <chris.lu@gmail.com> | 2014-09-20 12:38:59 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2014-09-20 12:38:59 -0700 |
| commit | b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999 (patch) | |
| tree | 719442dc72cc30958e54e4f7e59076796b6775e9 /go/weed | |
| parent | a092794804b2f7cbd656e439305d29bfa96ad2b9 (diff) | |
| download | seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.tar.xz seaweedfs-b9aee2defbc2f5aafbc3ea049fbe2ab5f3320999.zip | |
add TTL support
The volume TTL and file TTL are not necessarily the same. as long as
file TTL is smaller than volume TTL, it'll be fine.
volume TTL is used when assigning file id, e.g.
http://.../dir/assign?ttl=3h
file TTL is used when uploading
Diffstat (limited to 'go/weed')
| -rw-r--r-- | go/weed/benchmark.go | 2 | ||||
| -rw-r--r-- | go/weed/compact.go | 2 | ||||
| -rw-r--r-- | go/weed/upload.go | 6 | ||||
| -rw-r--r-- | go/weed/weed_server/common.go | 4 | ||||
| -rw-r--r-- | go/weed/weed_server/filer_server_handlers.go | 12 | ||||
| -rw-r--r-- | go/weed/weed_server/master_server_handlers_admin.go | 9 | ||||
| -rw-r--r-- | go/weed/weed_server/volume_server_handlers_admin.go | 2 |
7 files changed, 22 insertions, 15 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index eab923751..27aebaef0 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -201,7 +201,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { start := time.Now() fileSize := int64(*b.fileSize + rand.Intn(64)) fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} - if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection if _, ok := serverLimitChan[fp.Server]; !ok { serverLimitChan[fp.Server] = make(chan bool, 7) diff --git a/go/weed/compact.go b/go/weed/compact.go index 580f3f98d..57a02261f 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool { } vid := storage.VolumeId(*compactVolumeId) - v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil, nil) if err != nil { glog.Fatalf("Load Volume [ERROR] %s\n", err) } diff --git a/go/weed/upload.go b/go/weed/upload.go index b59313a2a..c21499dd0 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -12,6 +12,7 @@ var ( uploadReplication *string uploadCollection *string uploadDir *string + uploadTtl *string include *string maxMB *int ) @@ -24,6 +25,7 @@ func init() { include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") uploadReplication = cmdUpload.Flag.String("replication", "", "replication type") uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name") + uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") } @@ -67,7 +69,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) + results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -84,7 +86,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { fmt.Println(e.Error()) } - results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *maxMB) + results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) } diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index a547d7462..49e84378c 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -99,14 +99,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) + fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r) if pe != nil { writeJsonError(w, r, pe) return } debug("assigning file id for", fname) - assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection")) + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl")) if ae != nil { writeJsonError(w, r, ae) return diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index 0f83352a9..f760030f3 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -109,7 +109,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() - assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection) + assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl")) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) writeJsonError(w, r, ae) @@ -131,14 +131,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } resp, do_err := util.Do(request) if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) + glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) writeJsonError(w, r, do_err) return } defer resp.Body.Close() resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", ra_err.Error()) + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) writeJsonError(w, r, ra_err) return } @@ -146,12 +146,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { var ret operation.UploadResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", string(resp_body)) + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) writeJsonError(w, r, unmarshal_err) return } if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", ret.Error) + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) writeJsonError(w, r, errors.New(ret.Error)) return } @@ -169,7 +169,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { glog.V(4).Infoln("saving", path, "=>", assignResult.Fid) if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil { operation.DeleteFile(fs.master, assignResult.Fid) //clean up - glog.V(0).Infoln("failing to write to filer server", db_err.Error()) + glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error()) writeJsonError(w, r, db_err) return } diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go index d50075fd5..1458bf3e6 100644 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -55,7 +55,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { } } - ms.Topo.RegisterVolumes(joinMessage) + ms.Topo.ProcessJoinMessage(joinMessage) writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024}) } @@ -144,7 +144,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -157,9 +157,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + ttl, err := storage.ReadTTL(r.FormValue("ttl")) + if err != nil { + return nil, err + } volumeGrowOption := &topology.VolumeGrowOption{ Collection: r.FormValue("collection"), ReplicaPlacement: replicaPlacement, + Ttl: ttl, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), DataNode: r.FormValue("dataNode"), diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go index 6d285524a..1c01fdfd0 100644 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ b/go/weed/weed_server/volume_server_handlers_admin.go @@ -16,7 +16,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication")) + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl")) if err == nil { writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { |
