aboutsummaryrefslogtreecommitdiff
path: root/go/weed
diff options
context:
space:
mode:
authorchrislusf <chris.lu@gmail.com>2015-05-26 00:58:41 -0700
committerchrislusf <chris.lu@gmail.com>2015-05-26 00:58:41 -0700
commit86cd40fba87f7e69c10d1e66e967f6e5e40605b6 (patch)
tree0bec704ff7aa8c0dd89c2f4852844a230e46c821 /go/weed
parent7272af8ec45426151593fcd7d1a4d5a8092d5de6 (diff)
downloadseaweedfs-86cd40fba87f7e69c10d1e66e967f6e5e40605b6.tar.xz
seaweedfs-86cd40fba87f7e69c10d1e66e967f6e5e40605b6.zip
Add "weed backup" command.
This is a pre-cursor for asynchronous replication.
Diffstat (limited to 'go/weed')
-rw-r--r--go/weed/backup.go90
-rw-r--r--go/weed/compact.go1
-rw-r--r--go/weed/weed.go1
-rw-r--r--go/weed/weed_server/volume_server.go3
-rw-r--r--go/weed/weed_server/volume_server_handlers_read.go2
-rw-r--r--go/weed/weed_server/volume_server_handlers_sync.go86
-rw-r--r--go/weed/weed_server/volume_server_handlers_write.go4
7 files changed, 183 insertions, 4 deletions
diff --git a/go/weed/backup.go b/go/weed/backup.go
new file mode 100644
index 000000000..5e51a8b03
--- /dev/null
+++ b/go/weed/backup.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/go/operation"
+ "github.com/chrislusf/seaweedfs/go/storage"
+)
+
+var (
+ s BackupOptions
+)
+
+type BackupOptions struct {
+ master *string
+ collection *string
+ dir *string
+ volumeId *int
+}
+
+func init() {
+ cmdBackup.Run = runBackup // break init cycle
+ s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location")
+ s.collection = cmdBackup.Flag.String("collection", "", "collection name")
+ s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files")
+ s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
+}
+
+var cmdBackup = &Command{
+ UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333",
+ Short: "incrementally backup a volume to local folder",
+ Long: `Incrementally backup volume data.
+
+ It is expected that you use this inside a script, to loop through
+ all possible volume ids that needs to be backup to local folder.
+
+ The volume id does not need to exist locally or even remotely.
+ This will help to backup future new volumes.
+
+ Usually backing up is just copying the .dat (and .idx) files.
+ But it's tricky to incremententally copy the differences.
+
+ The complexity comes when there are multiple addition, deletion and compaction.
+ This tool will handle them correctly and efficiently, avoiding unnecessary data transporation.
+ `,
+}
+
+func runBackup(cmd *Command, args []string) bool {
+ if *s.volumeId == -1 {
+ return false
+ }
+ vid := storage.VolumeId(*s.volumeId)
+
+ // find volume location, replication, ttl info
+ lookup, err := operation.Lookup(*s.master, vid.String())
+ if err != nil {
+ fmt.Printf("Error looking up volume %d: %v\n", vid, err)
+ return true
+ }
+ volumeServer := lookup.Locations[0].Url
+
+ stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String())
+ if err != nil {
+ fmt.Printf("Error get volume %d status: %v\n", vid, err)
+ return true
+ }
+ ttl, err := storage.ReadTTL(stats.Ttl)
+ if err != nil {
+ fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
+ return true
+ }
+ replication, err := storage.NewReplicaPlacementFromString(stats.Replication)
+ if err != nil {
+ fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
+ return true
+ }
+
+ v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl)
+ if err != nil {
+ fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
+ return true
+ }
+
+ if err := v.Synchronize(volumeServer); err != nil {
+ fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
+ return true
+ }
+
+ return true
+}
diff --git a/go/weed/compact.go b/go/weed/compact.go
index a599f5d64..673b96901 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -7,7 +7,6 @@ import (
func init() {
cmdCompact.Run = runCompact // break init cycle
- cmdCompact.IsDebug = cmdCompact.Flag.Bool("debug", false, "enable debug mode")
}
var cmdCompact = &Command{
diff --git a/go/weed/weed.go b/go/weed/weed.go
index b3dd61616..49fe17eaa 100644
--- a/go/weed/weed.go
+++ b/go/weed/weed.go
@@ -21,6 +21,7 @@ var server *string
var commands = []*Command{
cmdBenchmark,
+ cmdBackup,
cmdCompact,
cmdFix,
cmdServer,
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index 6e0a7f536..703536c7a 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -51,6 +51,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
+ adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
+ adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
+ adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go
index a8156e85b..86dfee560 100644
--- a/go/weed/weed_server/volume_server_handlers_read.go
+++ b/go/weed/weed_server/volume_server_handlers_read.go
@@ -47,7 +47,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
- count, e := vs.store.Read(volumeId, n)
+ count, e := vs.store.ReadVolumeNeedle(volumeId, n)
glog.V(4).Infoln("read bytes", count, "error", e)
if e != nil || count <= 0 {
glog.V(0).Infoln("read error:", e, r.URL.Path)
diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go
new file mode 100644
index 000000000..c650e5f53
--- /dev/null
+++ b/go/weed/weed_server/volume_server_handlers_sync.go
@@ -0,0 +1,86 @@
+package weed_server
+
+import (
+ "fmt"
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/go/glog"
+ "github.com/chrislusf/seaweedfs/go/storage"
+ "github.com/chrislusf/seaweedfs/go/util"
+)
+
+func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) {
+ v, err := vs.getVolume("volume", r)
+ if v == nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ syncStat := v.GetVolumeSyncStatus()
+ if syncStat.Error != "" {
+ writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error))
+ glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err)
+ } else {
+ writeJsonQuiet(w, r, http.StatusOK, syncStat)
+ }
+}
+
+func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) {
+ v, err := vs.getVolume("volume", r)
+ if v == nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ content, err := v.IndexFileContent()
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ w.Write(content)
+}
+
+func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) {
+ v, err := vs.getVolume("volume", r)
+ if v == nil {
+ writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err))
+ return
+ }
+ if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) {
+ writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision))
+ return
+ }
+ offset := uint32(util.ParseUint64(r.FormValue("offset"), 0))
+ size := uint32(util.ParseUint64(r.FormValue("size"), 0))
+ content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size)
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ id := util.ParseUint64(r.FormValue("id"), 0)
+ n := new(storage.Needle)
+ n.ParseNeedleHeader(content)
+ if id != n.Id {
+ writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id))
+ return
+ }
+
+ w.Write(content)
+}
+
+func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) {
+ volumeIdString := r.FormValue(volumeParameterName)
+ if volumeIdString == "" {
+ err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName)
+ return nil, err
+ }
+ vid, err := storage.NewVolumeId(volumeIdString)
+ if err != nil {
+ err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString)
+ return nil, err
+ }
+ v := vs.store.GetVolume(vid)
+ if v == nil {
+ return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid)
+ }
+ return v, nil
+}
diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go
index fdede5562..2f7e79ce9 100644
--- a/go/weed/weed_server/volume_server_handlers_write.go
+++ b/go/weed/weed_server/volume_server_handlers_write.go
@@ -53,7 +53,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
glog.V(2).Infoln("deleting", n)
cookie := n.Cookie
- count, ok := vs.store.Read(volumeId, n)
+ count, ok := vs.store.ReadVolumeNeedle(volumeId, n)
if ok != nil {
m := make(map[string]uint32)
@@ -94,7 +94,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
n.ParsePath(id_cookie)
glog.V(4).Infoln("batch deleting", n)
cookie := n.Cookie
- if _, err := vs.store.Read(volumeId, n); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()})
continue
}