diff options
| author | chrislusf <chris.lu@gmail.com> | 2015-05-26 00:58:41 -0700 |
|---|---|---|
| committer | chrislusf <chris.lu@gmail.com> | 2015-05-26 00:58:41 -0700 |
| commit | 86cd40fba87f7e69c10d1e66e967f6e5e40605b6 (patch) | |
| tree | 0bec704ff7aa8c0dd89c2f4852844a230e46c821 /go/weed | |
| parent | 7272af8ec45426151593fcd7d1a4d5a8092d5de6 (diff) | |
| download | seaweedfs-86cd40fba87f7e69c10d1e66e967f6e5e40605b6.tar.xz seaweedfs-86cd40fba87f7e69c10d1e66e967f6e5e40605b6.zip | |
Add "weed backup" command.
This is a pre-cursor for asynchronous replication.
Diffstat (limited to 'go/weed')
| -rw-r--r-- | go/weed/backup.go | 90 | ||||
| -rw-r--r-- | go/weed/compact.go | 1 | ||||
| -rw-r--r-- | go/weed/weed.go | 1 | ||||
| -rw-r--r-- | go/weed/weed_server/volume_server.go | 3 | ||||
| -rw-r--r-- | go/weed/weed_server/volume_server_handlers_read.go | 2 | ||||
| -rw-r--r-- | go/weed/weed_server/volume_server_handlers_sync.go | 86 | ||||
| -rw-r--r-- | go/weed/weed_server/volume_server_handlers_write.go | 4 |
7 files changed, 183 insertions, 4 deletions
diff --git a/go/weed/backup.go b/go/weed/backup.go new file mode 100644 index 000000000..5e51a8b03 --- /dev/null +++ b/go/weed/backup.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/storage" +) + +var ( + s BackupOptions +) + +type BackupOptions struct { + master *string + collection *string + dir *string + volumeId *int +} + +func init() { + cmdBackup.Run = runBackup // break init cycle + s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location") + s.collection = cmdBackup.Flag.String("collection", "", "collection name") + s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files") + s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.") +} + +var cmdBackup = &Command{ + UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333", + Short: "incrementally backup a volume to local folder", + Long: `Incrementally backup volume data. + + It is expected that you use this inside a script, to loop through + all possible volume ids that needs to be backup to local folder. + + The volume id does not need to exist locally or even remotely. + This will help to backup future new volumes. + + Usually backing up is just copying the .dat (and .idx) files. + But it's tricky to incremententally copy the differences. + + The complexity comes when there are multiple addition, deletion and compaction. + This tool will handle them correctly and efficiently, avoiding unnecessary data transporation. + `, +} + +func runBackup(cmd *Command, args []string) bool { + if *s.volumeId == -1 { + return false + } + vid := storage.VolumeId(*s.volumeId) + + // find volume location, replication, ttl info + lookup, err := operation.Lookup(*s.master, vid.String()) + if err != nil { + fmt.Printf("Error looking up volume %d: %v\n", vid, err) + return true + } + volumeServer := lookup.Locations[0].Url + + stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + fmt.Printf("Error get volume %d status: %v\n", vid, err) + return true + } + ttl, err := storage.ReadTTL(stats.Ttl) + if err != nil { + fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) + return true + } + replication, err := storage.NewReplicaPlacementFromString(stats.Replication) + if err != nil { + fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) + return true + } + + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl) + if err != nil { + fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) + return true + } + + if err := v.Synchronize(volumeServer); err != nil { + fmt.Printf("Error synchronizing volume %d: %v\n", vid, err) + return true + } + + return true +} diff --git a/go/weed/compact.go b/go/weed/compact.go index a599f5d64..673b96901 100644 --- a/go/weed/compact.go +++ b/go/weed/compact.go @@ -7,7 +7,6 @@ import ( func init() { cmdCompact.Run = runCompact // break init cycle - cmdCompact.IsDebug = cmdCompact.Flag.Bool("debug", false, "enable debug mode") } var cmdCompact = &Command{ diff --git a/go/weed/weed.go b/go/weed/weed.go index b3dd61616..49fe17eaa 100644 --- a/go/weed/weed.go +++ b/go/weed/weed.go @@ -21,6 +21,7 @@ var server *string var commands = []*Command{ cmdBenchmark, + cmdBackup, cmdCompact, cmdFix, cmdServer, diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 6e0a7f536..703536c7a 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -51,6 +51,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) + adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) + adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index a8156e85b..86dfee560 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -47,7 +47,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } cookie := n.Cookie - count, e := vs.store.Read(volumeId, n) + count, e := vs.store.ReadVolumeNeedle(volumeId, n) glog.V(4).Infoln("read bytes", count, "error", e) if e != nil || count <= 0 { glog.V(0).Infoln("read error:", e, r.URL.Path) diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go new file mode 100644 index 000000000..c650e5f53 --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -0,0 +1,86 @@ +package weed_server + +import ( + "fmt" + "net/http" + + "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/storage" + "github.com/chrislusf/seaweedfs/go/util" +) + +func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + syncStat := v.GetVolumeSyncStatus() + if syncStat.Error != "" { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error)) + glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err) + } else { + writeJsonQuiet(w, r, http.StatusOK, syncStat) + } +} + +func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + content, err := v.IndexFileContent() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + w.Write(content) +} + +func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err)) + return + } + if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) { + writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision)) + return + } + offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) + size := uint32(util.ParseUint64(r.FormValue("size"), 0)) + content, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + id := util.ParseUint64(r.FormValue("id"), 0) + n := new(storage.Needle) + n.ParseNeedleHeader(content) + if id != n.Id { + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)) + return + } + + w.Write(content) +} + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + volumeIdString := r.FormValue(volumeParameterName) + if volumeIdString == "" { + err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) + return nil, err + } + vid, err := storage.NewVolumeId(volumeIdString) + if err != nil { + err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + return nil, err + } + v := vs.store.GetVolume(vid) + if v == nil { + return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) + } + return v, nil +} diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go index fdede5562..2f7e79ce9 100644 --- a/go/weed/weed_server/volume_server_handlers_write.go +++ b/go/weed/weed_server/volume_server_handlers_write.go @@ -53,7 +53,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { glog.V(2).Infoln("deleting", n) cookie := n.Cookie - count, ok := vs.store.Read(volumeId, n) + count, ok := vs.store.ReadVolumeNeedle(volumeId, n) if ok != nil { m := make(map[string]uint32) @@ -94,7 +94,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques n.ParsePath(id_cookie) glog.V(4).Infoln("batch deleting", n) cookie := n.Cookie - if _, err := vs.store.Read(volumeId, n); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { ret = append(ret, operation.DeleteResult{Fid: fid, Error: err.Error()}) continue } |
