diff options
Diffstat (limited to 'weed')
51 files changed, 0 insertions, 4799 deletions
diff --git a/weed/command.go b/weed/command.go deleted file mode 100644 index c8d86ca66..000000000 --- a/weed/command.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - "strings" -) - -type Command struct { - // Run runs the command. - // The args are the arguments after the command name. - Run func(cmd *Command, args []string) bool - - // UsageLine is the one-line usage message. - // The first word in the line is taken to be the command name. - UsageLine string - - // Short is the short description shown in the 'go help' output. - Short string - - // Long is the long message shown in the 'go help <this-command>' output. - Long string - - // Flag is a set of flags specific to this command. - Flag flag.FlagSet - - IsDebug *bool -} - -// Name returns the command's name: the first word in the usage line. -func (c *Command) Name() string { - name := c.UsageLine - i := strings.Index(name, " ") - if i >= 0 { - name = name[:i] - } - return name -} - -func (c *Command) Usage() { - fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) - fmt.Fprintf(os.Stderr, "Default Usage:\n") - c.Flag.PrintDefaults() - fmt.Fprintf(os.Stderr, "Description:\n") - fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) - os.Exit(2) -} - -// Runnable reports whether the command can be run; otherwise -// it is a documentation pseudo-command such as importpath. -func (c *Command) Runnable() bool { - return c.Run != nil -} diff --git a/weed/directory/file_id.go b/weed/directory/file_id.go deleted file mode 100644 index c9e20eaa9..000000000 --- a/weed/directory/file_id.go +++ /dev/null @@ -1,38 +0,0 @@ -package directory - -import ( - "encoding/hex" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/util" - "strings" -) - -type FileId struct { - VolumeId storage.VolumeId - Key uint64 - Hashcode uint32 -} - -func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { - return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} -} -func ParseFileId(fid string) *FileId { - a := strings.Split(fid, ",") - if len(a) != 2 { - println("Invalid fid", fid, ", split length", len(a)) - return nil - } - vid_string, key_hash_string := a[0], a[1] - volumeId, _ := storage.NewVolumeId(vid_string) - key, hash := storage.ParseKeyHash(key_hash_string) - return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} -} -func (n *FileId) String() string { - bytes := make([]byte, 12) - util.Uint64toBytes(bytes[0:8], n.Key) - util.Uint32toBytes(bytes[8:12], n.Hashcode) - nonzero_index := 0 - for ; bytes[nonzero_index] == 0; nonzero_index++ { - } - return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:]) -} diff --git a/weed/export.go b/weed/export.go deleted file mode 100644 index 44cf85c24..000000000 --- a/weed/export.go +++ /dev/null @@ -1,164 +0,0 @@ -package main - -import ( - "archive/tar" - "bytes" - "fmt" - "log" - "os" - "path" - "code.google.com/p/weed-fs/weed/directory" - "code.google.com/p/weed-fs/weed/storage" - "strconv" - "strings" - "text/template" - "time" -) - -func init() { - cmdExport.Run = runExport // break init cycle - cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode") -} - -const ( - defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}` -) - -var cmdExport = &Command{ - UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}", - Short: "list or export files from one volume data file", - Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified. - - The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}. - - `, -} - -var ( - exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") - exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") - dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") - format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") - tarFh *tar.Writer - tarHeader tar.Header - fnTmpl *template.Template - fnTmplBuf = bytes.NewBuffer(nil) -) - -func runExport(cmd *Command, args []string) bool { - - if *exportVolumeId == -1 { - return false - } - - var err error - if *dest != "" { - if *dest != "-" && !strings.HasSuffix(*dest, ".tar") { - fmt.Println("the output file", *dest, "should be '-' or end with .tar") - return false - } - - if fnTmpl, err = template.New("name").Parse(*format); err != nil { - fmt.Println("cannot parse format " + *format + ": " + err.Error()) - return false - } - - var fh *os.File - if *dest == "-" { - fh = os.Stdout - } else { - if fh, err = os.Create(*dest); err != nil { - log.Fatalf("cannot open output tar %s: %s", *dest, err) - } - } - defer fh.Close() - tarFh = tar.NewWriter(fh) - defer tarFh.Close() - t := time.Now() - tarHeader = tar.Header{Mode: 0644, - ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), - Typeflag: tar.TypeReg, - AccessTime: t, ChangeTime: t} - } - - fileName := strconv.Itoa(*exportVolumeId) - vid := storage.VolumeId(*exportVolumeId) - indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644) - if err != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - - nm := storage.LoadNeedleMap(indexFile) - - var version storage.Version - - err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error { - version = superBlock.Version - return nil - }, func(n *storage.Needle, offset uint32) error { - debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) - nv, ok := nm.Get(n.Id) - if ok && nv.Size > 0 { - return walker(vid, n, version) - } else { - if !ok { - debug("This seems deleted", n.Id) - } else { - debug("Id", n.Id, "size", n.Size) - } - } - return nil - }) - if err != nil { - log.Fatalf("Export Volume File [ERROR] %s\n", err) - } - return true -} - -type nameParams struct { - Name string - Id uint64 - Mime string - Key string -} - -func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) { - key := directory.NewFileId(vid, n.Id, n.Cookie).String() - if tarFh != nil { - fnTmplBuf.Reset() - if err = fnTmpl.Execute(fnTmplBuf, - nameParams{Name: string(n.Name), - Id: n.Id, - Mime: string(n.Mime), - Key: key, - }, - ); err != nil { - return err - } - nm := fnTmplBuf.String() - - if n.IsGzipped() && path.Ext(nm) != ".gz" { - nm = nm + ".gz" - } - - tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) - if err = tarFh.WriteHeader(&tarHeader); err != nil { - return err - } - _, err = tarFh.Write(n.Data) - } else { - size := n.DataSize - if version == storage.Version1 { - size = n.Size - } - fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n", - key, - n.Name, - size, - n.IsGzipped(), - n.Mime, - ) - } - return -} diff --git a/weed/fix.go b/weed/fix.go deleted file mode 100644 index 85693d9b1..000000000 --- a/weed/fix.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -import ( - "log" - "os" - "path" - "code.google.com/p/weed-fs/weed/storage" - "strconv" -) - -func init() { - cmdFix.Run = runFix // break init cycle - cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdFix = &Command{ - UsageLine: "fix -dir=/tmp -volumeId=234", - Short: "run weed tool fix on index file if corrupted", - Long: `Fix runs the WeedFS fix command to re-create the index .idx file. - - `, -} - -var ( - fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") - fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") -) - -func runFix(cmd *Command, args []string) bool { - - if *fixVolumeId == -1 { - return false - } - - fileName := strconv.Itoa(*fixVolumeId) - indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - - nm := storage.NewNeedleMap(indexFile) - defer nm.Close() - - vid := storage.VolumeId(*fixVolumeId) - err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error { - return nil - }, func(n *storage.Needle, offset uint32) error { - debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) - if n.Size > 0 { - count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size) - debug("saved", count, "with error", pe) - } else { - debug("skipping deleted file ...") - nm.Delete(n.Id) - } - return nil - }) - if err != nil { - log.Fatalf("Export Volume File [ERROR] %s\n", err) - } - - return true -} diff --git a/weed/master.go b/weed/master.go deleted file mode 100644 index 78db929f2..000000000 --- a/weed/master.go +++ /dev/null @@ -1,217 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "log" - "net/http" - "code.google.com/p/weed-fs/weed/replication" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/topology" - "runtime" - "strconv" - "strings" - "time" -) - -func init() { - cmdMaster.Run = runMaster // break init cycle - cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdMaster = &Command{ - UsageLine: "master -port=9333", - Short: "start a master server", - Long: `start a master server to provide volume=>location mapping service - and sequence number of file ids - - `, -} - -var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") - mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") - defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") - mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") -) - -var topo *topology.Topology -var vg *replication.VolumeGrowth - -func dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - volumeId, err := storage.NewVolumeId(vid) - if err == nil { - machines := topo.Lookup(volumeId) - if machines != nil { - ret := []map[string]string{} - for _, dn := range machines { - ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) - } - writeJson(w, r, map[string]interface{}{"locations": ret}) - } else { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) - } -} - -func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c, e := strconv.Atoi(r.FormValue("count")) - if e != nil { - c = 1 - } - repType := r.FormValue("replication") - if repType == "" { - repType = *defaultRepType - } - rt, err := storage.NewReplicationTypeFromString(repType) - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - return - } - if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { - if topo.FreeSpace() <= 0 { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "No free volumes left!"}) - return - } else { - vg.GrowByType(rt, topo) - } - } - fid, count, dn, err := topo.PickForWrite(rt, c) - if err == nil { - writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - } -} - -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - init := r.FormValue("init") == "true" - ip := r.FormValue("ip") - if ip == "" { - ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] - } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - debug(s, "volumes", r.FormValue("volumes")) - topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) - m := make(map[string]interface{}) - m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 - writeJson(w, r, m) -} - -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Topology"] = topo.ToMap() - writeJson(w, r, m) -} - -func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - gcThreshold := r.FormValue("garbageThreshold") - if gcThreshold == "" { - gcThreshold = *garbageThreshold - } - debug("garbageThreshold =", gcThreshold) - topo.Vacuum(gcThreshold) - dirStatusHandler(w, r) -} - -func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if topo.FreeSpace() < count*rt.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) - } else { - count, err = vg.GrowByCountAndType(count, rt, topo) - } - } else { - err = errors.New("parameter count is not found") - } - } - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]interface{}{"count": count}) - } -} - -func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = topo.ToVolumeMap() - writeJson(w, r, m) -} - -func redirectHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - machines := topo.Lookup(volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } -} - -func runMaster(cmd *Command, args []string) bool { - if *mMaxCpu < 1 { - *mMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*mMaxCpu) - topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) - vg = replication.NewDefaultVolumeGrowth() - log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - http.HandleFunc("/dir/assign", dirAssignHandler) - http.HandleFunc("/dir/lookup", dirLookupHandler) - http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) - http.HandleFunc("/vol/grow", volumeGrowHandler) - http.HandleFunc("/vol/status", volumeStatusHandler) - http.HandleFunc("/vol/vacuum", volumeVacuumHandler) - - http.HandleFunc("/", redirectHandler) - - topo.StartRefreshWritableVolumes(*garbageThreshold) - - log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) - srv := &http.Server{ - Addr: ":" + strconv.Itoa(*mport), - Handler: http.DefaultServeMux, - ReadTimeout: time.Duration(*mReadTimeout) * time.Second, - } - e := srv.ListenAndServe() - if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) - } - return true -} diff --git a/weed/operation/allocate_volume.go b/weed/operation/allocate_volume.go deleted file mode 100644 index c20d586cf..000000000 --- a/weed/operation/allocate_volume.go +++ /dev/null @@ -1,32 +0,0 @@ -package operation - -import ( - "encoding/json" - "errors" - "net/url" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/topology" - "code.google.com/p/weed-fs/weed/util" -) - -type AllocateVolumeResult struct { - Error string -} - -func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("replicationType", repType.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go deleted file mode 100644 index 2bdb49651..000000000 --- a/weed/operation/delete_content.go +++ /dev/null @@ -1,16 +0,0 @@ -package operation - -import ( - "log" - "net/http" -) - -func Delete(url string) error { - req, err := http.NewRequest("DELETE", url, nil) - if err != nil { - log.Println("failing to delete", url) - return err - } - _, err = http.DefaultClient.Do(req) - return err -} diff --git a/weed/operation/lookup_volume_id.go b/weed/operation/lookup_volume_id.go deleted file mode 100644 index 67d4ccb39..000000000 --- a/weed/operation/lookup_volume_id.go +++ /dev/null @@ -1,38 +0,0 @@ -package operation - -import ( - "encoding/json" - "errors" - _ "fmt" - "net/url" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/util" -) - -type Location struct { - Url string "url" - PublicUrl string "publicUrl" -} -type LookupResult struct { - Locations []Location "locations" - Error string "error" -} - -//TODO: Add a caching for vid here -func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { - values := make(url.Values) - values.Add("volumeId", vid.String()) - jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) - if err != nil { - return nil, err - } - var ret LookupResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) - } - return &ret, nil -} diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go deleted file mode 100644 index 0bdb697da..000000000 --- a/weed/operation/upload_content.go +++ /dev/null @@ -1,47 +0,0 @@ -package operation - -import ( - "bytes" - "encoding/json" - "errors" - _ "fmt" - "io" - "io/ioutil" - "log" - "mime/multipart" - "net/http" -) - -type UploadResult struct { - Size int - Error string -} - -func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - file_writer, err := body_writer.CreateFormFile("file", filename) - io.Copy(file_writer, reader) - content_type := body_writer.FormDataContentType() - body_writer.Close() - resp, err := http.Post(uploadUrl, content_type, body_buf) - if err != nil { - log.Println("failing to upload to", uploadUrl) - return nil, err - } - defer resp.Body.Close() - resp_body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - var ret UploadResult - err = json.Unmarshal(resp_body, &ret) - if err != nil { - log.Println("failing to read upload resonse", uploadUrl, resp_body) - return nil, err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) - } - return &ret, nil -} diff --git a/weed/replication/volume_growth.go b/weed/replication/volume_growth.go deleted file mode 100644 index bb59c9a8e..000000000 --- a/weed/replication/volume_growth.go +++ /dev/null @@ -1,195 +0,0 @@ -package replication - -import ( - "errors" - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/operation" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/topology" - "sync" -) - -/* -This package is created to resolve these replica placement issues: -1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies -2. in time of tight storage, how to reduce replica level -3. optimizing for hot data on faster disk, cold data on cheaper storage, -4. volume allocation for each bucket -*/ - -type VolumeGrowth struct { - copy1factor int - copy2factor int - copy3factor int - copyAll int - - accessLock sync.Mutex -} - -func NewDefaultVolumeGrowth() *VolumeGrowth { - return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} -} - -func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) { - switch repType { - case storage.Copy000: - return vg.GrowByCountAndType(vg.copy1factor, repType, topo) - case storage.Copy001: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) - case storage.Copy010: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) - case storage.Copy100: - return vg.GrowByCountAndType(vg.copy2factor, repType, topo) - case storage.Copy110: - return vg.GrowByCountAndType(vg.copy3factor, repType, topo) - case storage.Copy200: - return vg.GrowByCountAndType(vg.copy3factor, repType, topo) - } - return 0, errors.New("Unknown Replication Type!") -} -func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() - - counter = 0 - switch repType { - case storage.Copy000: - for i := 0; i < count; i++ { - if ok, server, vid := topo.RandomlyReserveOneVolume(); ok { - if err = vg.grow(topo, *vid, repType, server); err == nil { - counter++ - } - } - } - case storage.Copy001: - for i := 0; i < count; i++ { - //randomly pick one server, and then choose from the same rack - if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { - rack := server1.Parent() - exclusion := make(map[string]topology.Node) - exclusion[server1.String()] = server1 - newNodeList := topology.NewNodeList(rack.Children(), exclusion) - if newNodeList.FreeSpace() > 0 { - if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { - if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { - counter++ - } - } - } - } - } - case storage.Copy010: - for i := 0; i < count; i++ { - //randomly pick one server, and then choose from the same rack - if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { - rack := server1.Parent() - dc := rack.Parent() - exclusion := make(map[string]topology.Node) - exclusion[rack.String()] = rack - newNodeList := topology.NewNodeList(dc.Children(), exclusion) - if newNodeList.FreeSpace() > 0 { - if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { - if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { - counter++ - } - } - } - } - } - case storage.Copy100: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2, 1) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - for _, n := range picked { - if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { - servers = append(servers, server) - } - } - } - if len(servers) == 2 { - if err = vg.grow(topo, vid, repType, servers...); err == nil { - counter++ - } - } - } - } - case storage.Copy110: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(2, 2) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - dc1, dc2 := picked[0], picked[1] - if dc2.FreeSpace() > dc1.FreeSpace() { - dc1, dc2 = dc2, dc1 - } - if dc1.FreeSpace() > 0 { - if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok { - servers = append(servers, server1) - rack := server1.Parent() - exclusion := make(map[string]topology.Node) - exclusion[rack.String()] = rack - newNodeList := topology.NewNodeList(dc1.Children(), exclusion) - if newNodeList.FreeSpace() > 0 { - if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 { - servers = append(servers, server2) - } - } - } - } - if dc2.FreeSpace() > 0 { - if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok { - servers = append(servers, server) - } - } - if len(servers) == 3 { - if err = vg.grow(topo, vid, repType, servers...); err == nil { - counter++ - } - } - } - } - case storage.Copy200: - for i := 0; i < count; i++ { - nl := topology.NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(3, 1) - vid := topo.NextVolumeId() - if ret { - var servers []*topology.DataNode - for _, n := range picked { - if n.FreeSpace() > 0 { - if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { - servers = append(servers, server) - } - } - } - if len(servers) == 3 { - if err = vg.grow(topo, vid, repType, servers...); err == nil { - counter++ - } - } - } - } - } - return -} -func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { - for _, server := range servers { - if err := operation.AllocateVolume(server, vid, repType); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - topo.RegisterVolumeLayout(&vi, server) - fmt.Println("Created Volume", vid, "on", server) - } else { - fmt.Println("Failed to assign", vid, "to", servers) - return errors.New("Failed to assign " + vid.String()) - } - } - return nil -} diff --git a/weed/replication/volume_growth_test.go b/weed/replication/volume_growth_test.go deleted file mode 100644 index 4ef0df46c..000000000 --- a/weed/replication/volume_growth_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package replication - -import ( - "encoding/json" - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/topology" - "testing" - "time" -) - -var topologyLayout = ` -{ - "dc1":{ - "rack1":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":2, "size":12312}, - {"id":3, "size":12312} - ], - "limit":3 - }, - "server2":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":10 - } - }, - "rack2":{ - "server1":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":4 - }, - "server2":{ - "volumes":[], - "limit":4 - }, - "server3":{ - "volumes":[ - {"id":2, "size":12312}, - {"id":3, "size":12312}, - {"id":4, "size":12312} - ], - "limit":2 - } - } - }, - "dc2":{ - }, - "dc3":{ - "rack2":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":3, "size":12312}, - {"id":5, "size":12312} - ], - "limit":4 - } - } - } -} -` - -func setup(topologyLayout string) *topology.Topology { - var data interface{} - err := json.Unmarshal([]byte(topologyLayout), &data) - if err != nil { - fmt.Println("error:", err) - } - fmt.Println("data:", data) - - //need to connect all nodes first before server adding volumes - topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) - mTopology := data.(map[string]interface{}) - for dcKey, dcValue := range mTopology { - dc := topology.NewDataCenter(dcKey) - dcMap := dcValue.(map[string]interface{}) - topo.LinkChildNode(dc) - for rackKey, rackValue := range dcMap { - rack := topology.NewRack(rackKey) - rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) - for serverKey, serverValue := range rackMap { - server := topology.NewDataNode(serverKey) - serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) - for _, v := range serverMap["volumes"].([]interface{}) { - m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) - } - } - } - - return topo -} - -func TestRemoveDataCenter(t *testing.T) { - topo := setup(topologyLayout) - topo.UnlinkChildNode(topology.NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { - t.Fail() - } - topo.UnlinkChildNode(topology.NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { - t.Fail() - } -} - -func TestReserveOneVolume(t *testing.T) { - topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} - if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { - t.Log("reserved", c) - } -} diff --git a/weed/sequence/sequence.go b/weed/sequence/sequence.go deleted file mode 100644 index c85289468..000000000 --- a/weed/sequence/sequence.go +++ /dev/null @@ -1,71 +0,0 @@ -package sequence - -import ( - "encoding/gob" - "log" - "os" - "path" - "sync" -) - -const ( - FileIdSaveInterval = 10000 -) - -type Sequencer interface { - NextFileId(count int) (uint64, int) -} -type SequencerImpl struct { - dir string - fileName string - - volumeLock sync.Mutex - sequenceLock sync.Mutex - - FileIdSequence uint64 - fileIdCounter uint64 -} - -func NewSequencer(dirname string, filename string) (m *SequencerImpl) { - m = &SequencerImpl{dir: dirname, fileName: filename} - - seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) - if se != nil { - m.FileIdSequence = FileIdSaveInterval - log.Println("Setting file id sequence", m.FileIdSequence) - } else { - decoder := gob.NewDecoder(seqFile) - defer seqFile.Close() - decoder.Decode(&m.FileIdSequence) - log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) - //in case the server stops between intervals - m.FileIdSequence += FileIdSaveInterval - } - return -} - -//count should be 1 or more -func (m *SequencerImpl) NextFileId(count int) (uint64, int) { - if count <= 0 { - return 0, 0 - } - m.sequenceLock.Lock() - defer m.sequenceLock.Unlock() - if m.fileIdCounter < uint64(count) { - m.fileIdCounter = FileIdSaveInterval - m.FileIdSequence += FileIdSaveInterval - m.saveSequence() - } - m.fileIdCounter = m.fileIdCounter - uint64(count) - return m.FileIdSequence - m.fileIdCounter, count -} -func (m *SequencerImpl) saveSequence() { - log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) - seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) - if e != nil { - log.Fatalf("Sequence File Save [ERROR] %s\n", e) - } - defer seqFile.Close() - encoder := gob.NewEncoder(seqFile) - encoder.Encode(m.FileIdSequence) -} diff --git a/weed/shell.go b/weed/shell.go deleted file mode 100644 index daf0b7e1f..000000000 --- a/weed/shell.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "os" -) - -func init() { - cmdShell.Run = runShell // break init cycle -} - -var cmdShell = &Command{ - UsageLine: "shell", - Short: "run interactive commands, now just echo", - Long: `run interactive commands. - - `, -} - -var () - -func runShell(command *Command, args []string) bool { - r := bufio.NewReader(os.Stdin) - o := bufio.NewWriter(os.Stdout) - e := bufio.NewWriter(os.Stderr) - prompt := func() { - o.WriteString("> ") - o.Flush() - } - readLine := func() string { - ret, err := r.ReadString('\n') - if err != nil { - fmt.Fprint(e, err) - os.Exit(1) - } - return ret - } - execCmd := func(cmd string) int { - if cmd != "" { - o.WriteString(cmd) - } - return 0 - } - - cmd := "" - for { - prompt() - cmd = readLine() - execCmd(cmd) - } - return true -} diff --git a/weed/storage/compact_map.go b/weed/storage/compact_map.go deleted file mode 100644 index 0b33961c4..000000000 --- a/weed/storage/compact_map.go +++ /dev/null @@ -1,182 +0,0 @@ -package storage - -import () - -type NeedleValue struct { - Key Key - Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G - Size uint32 "Size of the data portion" -} - -const ( - batch = 100000 -) - -type Key uint64 - -type CompactSection struct { - values []NeedleValue - overflow map[Key]NeedleValue - start Key - end Key - counter int -} - -func NewCompactSection(start Key) CompactSection { - return CompactSection{ - values: make([]NeedleValue, batch), - overflow: make(map[Key]NeedleValue), - start: start, - } -} - -//return old entry size -func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { - ret := uint32(0) - if key > cs.end { - cs.end = key - } - if i := cs.binarySearchValues(key); i >= 0 { - ret = cs.values[i].Size - //println("key", key, "old size", ret) - cs.values[i].Offset, cs.values[i].Size = offset, size - } else { - needOverflow := cs.counter >= batch - needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key - if needOverflow { - //println("start", cs.start, "counter", cs.counter, "key", key) - if oldValue, found := cs.overflow[key]; found { - ret = oldValue.Size - } - cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size} - } else { - p := &cs.values[cs.counter] - p.Key, p.Offset, p.Size = key, offset, size - //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) - cs.counter++ - } - } - return ret -} - -//return old entry size -func (cs *CompactSection) Delete(key Key) uint32 { - ret := uint32(0) - if i := cs.binarySearchValues(key); i >= 0 { - if cs.values[i].Size > 0 { - ret = cs.values[i].Size - cs.values[i].Size = 0 - } - } - if v, found := cs.overflow[key]; found { - delete(cs.overflow, key) - ret = v.Size - } - return ret -} -func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { - if v, ok := cs.overflow[key]; ok { - return &v, true - } - if i := cs.binarySearchValues(key); i >= 0 { - return &cs.values[i], true - } - return nil, false -} -func (cs *CompactSection) binarySearchValues(key Key) int { - l, h := 0, cs.counter-1 - if h >= 0 && cs.values[h].Key < key { - return -2 - } - //println("looking for key", key) - for l <= h { - m := (l + h) / 2 - //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size) - if cs.values[m].Key < key { - l = m + 1 - } else if key < cs.values[m].Key { - h = m - 1 - } else { - //println("found", m) - return m - } - } - return -1 -} - -//This map assumes mostly inserting increasing keys -type CompactMap struct { - list []CompactSection -} - -func NewCompactMap() CompactMap { - return CompactMap{} -} - -func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { - x := cm.binarySearchCompactSection(key) - if x < 0 { - //println(x, "creating", len(cm.list), "section1, starting", key) - cm.list = append(cm.list, NewCompactSection(key)) - x = len(cm.list) - 1 - } - return cm.list[x].Set(key, offset, size) -} -func (cm *CompactMap) Delete(key Key) uint32 { - x := cm.binarySearchCompactSection(key) - if x < 0 { - return uint32(0) - } - return cm.list[x].Delete(key) -} -func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { - x := cm.binarySearchCompactSection(key) - if x < 0 { - return nil, false - } - return cm.list[x].Get(key) -} -func (cm *CompactMap) binarySearchCompactSection(key Key) int { - l, h := 0, len(cm.list)-1 - if h < 0 { - return -5 - } - if cm.list[h].start <= key { - if cm.list[h].counter < batch || key <= cm.list[h].end { - return h - } else { - return -4 - } - } - for l <= h { - m := (l + h) / 2 - if key < cm.list[m].start { - h = m - 1 - } else { // cm.list[m].start <= key - if cm.list[m+1].start <= key { - l = m + 1 - } else { - return m - } - } - } - return -3 -} - -func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { - for _, cs := range cm.list { - for _, v := range cs.overflow { - if err := visit(v); err != nil { - return err - } - } - for _, v := range cs.values { - if _, found := cs.overflow[v.Key]; !found { - if err := visit(v); err != nil { - return err - } - } - } - } - return nil -} diff --git a/weed/storage/compact_map_perf_test.go b/weed/storage/compact_map_perf_test.go deleted file mode 100644 index 8e114d305..000000000 --- a/weed/storage/compact_map_perf_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package storage - -import ( - "log" - "os" - "code.google.com/p/weed-fs/weed/util" - "testing" -) - -func TestMemoryUsage(t *testing.T) { - - indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) - if ie != nil { - log.Fatalln(ie) - } - LoadNewNeedleMap(indexFile) - -} - -func LoadNewNeedleMap(file *os.File) CompactMap { - m := NewCompactMap() - bytes := make([]byte, 16*1024) - count, e := file.Read(bytes) - if count > 0 { - fstat, _ := file.Stat() - log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) - } - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - if offset > 0 { - m.Set(Key(key), offset, size) - } else { - //delete(m, key) - } - } - - count, e = file.Read(bytes) - } - return m -} diff --git a/weed/storage/compact_map_test.go b/weed/storage/compact_map_test.go deleted file mode 100644 index e76e9578d..000000000 --- a/weed/storage/compact_map_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package storage - -import ( - "testing" -) - -func TestXYZ(t *testing.T) { - m := NewCompactMap() - for i := uint32(0); i < 100*batch; i += 2 { - m.Set(Key(i), i, i) - } - - for i := uint32(0); i < 100*batch; i += 37 { - m.Delete(Key(i)) - } - - for i := uint32(0); i < 10*batch; i += 3 { - m.Set(Key(i), i+11, i+5) - } - - // for i := uint32(0); i < 100; i++ { - // if v := m.Get(Key(i)); v != nil { - // println(i, "=", v.Key, v.Offset, v.Size) - // } - // } - - for i := uint32(0); i < 10*batch; i++ { - v, ok := m.Get(Key(i)) - if i%3 == 0 { - if !ok { - t.Fatal("key", i, "missing!") - } - if v.Size != i+5 { - t.Fatal("key", i, "size", v.Size) - } - } else if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } - } else if i%2 == 0 { - if v.Size != i { - t.Fatal("key", i, "size", v.Size) - } - } - } - - for i := uint32(10 * batch); i < 100*batch; i++ { - v, ok := m.Get(Key(i)) - if i%37 == 0 { - if ok && v.Size > 0 { - t.Fatal("key", i, "should have been deleted needle value", v) - } - } else if i%2 == 0 { - if v == nil { - t.Fatal("key", i, "missing") - } - if v.Size != i { - t.Fatal("key", i, "size", v.Size) - } - } - } - -} diff --git a/weed/storage/compress.go b/weed/storage/compress.go deleted file mode 100644 index 256789c9c..000000000 --- a/weed/storage/compress.go +++ /dev/null @@ -1,57 +0,0 @@ -package storage - -import ( - "bytes" - "compress/flate" - "compress/gzip" - "io/ioutil" - "strings" -) - -/* -* Default more not to gzip since gzip can be done on client side. -*/ -func IsGzippable(ext, mtype string) bool { - if strings.HasPrefix(mtype, "text/") { - return true - } - switch ext { - case ".zip", ".rar", ".gz", ".bz2", ".xz": - return false - case ".pdf", ".txt", ".html", ".css", ".js", ".json": - return true - } - if strings.HasPrefix(mtype, "application/") { - if strings.HasSuffix(mtype, "xml") { - return true - } - if strings.HasSuffix(mtype, "script") { - return true - } - } - return false -} - -func GzipData(input []byte) ([]byte, error) { - buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) - if _, err := w.Write(input); err != nil { - println("error compressing data:", err) - return nil, err - } - if err := w.Close(); err != nil { - println("error closing compressed data:", err) - return nil, err - } - return buf.Bytes(), nil -} -func UnGzipData(input []byte) ([]byte, error) { - buf := bytes.NewBuffer(input) - r, _ := gzip.NewReader(buf) - defer r.Close() - output, err := ioutil.ReadAll(r) - if err != nil { - println("error uncompressing data:", err) - } - return output, err -} diff --git a/weed/storage/crc.go b/weed/storage/crc.go deleted file mode 100644 index 198352e68..000000000 --- a/weed/storage/crc.go +++ /dev/null @@ -1,21 +0,0 @@ -package storage - -import ( - "hash/crc32" -) - -var table = crc32.MakeTable(crc32.Castagnoli) - -type CRC uint32 - -func NewCRC(b []byte) CRC { - return CRC(0).Update(b) -} - -func (c CRC) Update(b []byte) CRC { - return CRC(crc32.Update(uint32(c), table, b)) -} - -func (c CRC) Value() uint32 { - return uint32(c>>15|c<<17) + 0xa282ead8 -} diff --git a/weed/storage/needle.go b/weed/storage/needle.go deleted file mode 100644 index 6ba164735..000000000 --- a/weed/storage/needle.go +++ /dev/null @@ -1,132 +0,0 @@ -package storage - -import ( - "encoding/hex" - "fmt" - "io/ioutil" - "mime" - "net/http" - "path" - "code.google.com/p/weed-fs/weed/util" - "strconv" - "strings" -) - -const ( - NeedleHeaderSize = 16 //should never change this - NeedlePaddingSize = 8 - NeedleChecksumSize = 4 -) - -type Needle struct { - Cookie uint32 "random number to mitigate brute force lookups" - Id uint64 "needle id" - Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime" - - DataSize uint32 "Data size" //version2 - Data []byte "The actual file data" - Flags byte "boolean flags" //version2 - NameSize uint8 //version2 - Name []byte "maximum 256 characters" //version2 - MimeSize uint8 //version2 - Mime []byte "maximum 256 characters" //version2 - - Checksum CRC "CRC32 to check integrity" - Padding []byte "Aligned to 8 bytes" -} - -func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { - - n = new(Needle) - form, fe := r.MultipartReader() - if fe != nil { - fmt.Println("MultipartReader [ERROR]", fe) - e = fe - return - } - part, fe := form.NextPart() - if fe != nil { - fmt.Println("Reading Multi part [ERROR]", fe) - e = fe - return - } - fname = part.FileName() - fname = path.Base(fname) - data, _ := ioutil.ReadAll(part) - dotIndex := strings.LastIndex(fname, ".") - ext, mtype := "", "" - if dotIndex > 0 { - ext = fname[dotIndex:] - mtype = mime.TypeByExtension(ext) - } - contentType := part.Header.Get("Content-Type") - if contentType != "" && mtype != contentType && len(contentType) < 256 { - n.Mime = []byte(contentType) - n.SetHasMime() - mtype = contentType - } - if IsGzippable(ext, mtype) { - if data, e = GzipData(data); e != nil { - return - } - n.SetGzipped() - } - if ext == ".gz" { - n.SetGzipped() - } - if len(fname) < 256 { - if strings.HasSuffix(fname, ".gz") { - n.Name = []byte(fname[:len(fname)-3]) - } else { - n.Name = []byte(fname) - } - n.SetHasName() - } - - n.Data = data - n.Checksum = NewCRC(data) - - commaSep := strings.LastIndex(r.URL.Path, ",") - dotSep := strings.LastIndex(r.URL.Path, ".") - fid := r.URL.Path[commaSep+1:] - if dotSep > 0 { - fid = r.URL.Path[commaSep+1 : dotSep] - } - - n.ParsePath(fid) - - return -} -func (n *Needle) ParsePath(fid string) { - length := len(fid) - if length <= 8 { - if length > 0 { - println("Invalid fid", fid, "length", length) - } - return - } - delta := "" - deltaIndex := strings.LastIndex(fid, "_") - if deltaIndex > 0 { - fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] - } - n.Id, n.Cookie = ParseKeyHash(fid) - if delta != "" { - d, e := strconv.ParseUint(delta, 10, 64) - if e == nil { - n.Id += d - } - } -} - -func ParseKeyHash(key_hash_string string) (uint64, uint32) { - key_hash_bytes, khe := hex.DecodeString(key_hash_string) - key_hash_len := len(key_hash_bytes) - if khe != nil || key_hash_len <= 4 { - println("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe) - return 0, 0 - } - key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) - hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) - return key, hash -} diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go deleted file mode 100644 index 47e1c43f0..000000000 --- a/weed/storage/needle_map.go +++ /dev/null @@ -1,99 +0,0 @@ -package storage - -import ( - //"log" - "os" - "code.google.com/p/weed-fs/weed/util" -) - -type NeedleMap struct { - indexFile *os.File - m CompactMap - - //transient - bytes []byte - - deletionCounter int - fileCounter int - deletionByteCounter uint64 - fileByteCounter uint64 -} - -func NewNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: NewCompactMap(), - bytes: make([]byte, 16), - indexFile: file, - } - return nm -} - -const ( - RowsToRead = 1024 -) - -func LoadNeedleMap(file *os.File) *NeedleMap { - nm := NewNeedleMap(file) - bytes := make([]byte, 16*RowsToRead) - count, e := nm.indexFile.Read(bytes) - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - } - - count, e = nm.indexFile.Read(bytes) - } - return nm -} - -func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { - oldSize := nm.m.Set(Key(key), offset, size) - util.Uint64toBytes(nm.bytes[0:8], key) - util.Uint32toBytes(nm.bytes[8:12], offset) - util.Uint32toBytes(nm.bytes[12:16], size) - nm.fileCounter++ - nm.fileByteCounter = nm.fileByteCounter + uint64(size) - if oldSize > 0 { - nm.deletionCounter++ - nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) - } - return nm.indexFile.Write(nm.bytes) -} -func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { - element, ok = nm.m.Get(Key(key)) - return -} -func (nm *NeedleMap) Delete(key uint64) { - nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) - util.Uint64toBytes(nm.bytes[0:8], key) - util.Uint32toBytes(nm.bytes[8:12], 0) - util.Uint32toBytes(nm.bytes[12:16], 0) - nm.indexFile.Write(nm.bytes) - nm.deletionCounter++ -} -func (nm *NeedleMap) Close() { - nm.indexFile.Close() -} -func (nm *NeedleMap) ContentSize() uint64 { - return nm.fileByteCounter -} -func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { - return nm.m.Visit(visit) -} diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go deleted file mode 100644 index 74e866706..000000000 --- a/weed/storage/needle_read_write.go +++ /dev/null @@ -1,238 +0,0 @@ -package storage - -import ( - "errors" - "fmt" - "io" - "os" - "code.google.com/p/weed-fs/weed/util" -) - -const ( - FlagGzip = 0x01 - FlagHasName = 0x02 - FlagHasMime = 0x04 -) - -func (n *Needle) DiskSize() uint32 { - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) - return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize -} -func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { - if s, ok := w.(io.Seeker); ok { - if end, e := s.Seek(0, 1); e == nil { - defer func(s io.Seeker, off int64) { - if err != nil { - if _, e = s.Seek(off, 0); e != nil { - fmt.Printf("Failed to seek back to %d with error: %s\n", w, off, e) - } - } - }(s, end) - } else { - err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e) - return - } - } - switch version { - case Version1: - header := make([]byte, NeedleHeaderSize) - util.Uint32toBytes(header[0:4], n.Cookie) - util.Uint64toBytes(header[4:12], n.Id) - n.Size = uint32(len(n.Data)) - size = n.Size - util.Uint32toBytes(header[12:16], n.Size) - if _, err = w.Write(header); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) - util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return - case Version2: - header := make([]byte, NeedleHeaderSize) - util.Uint32toBytes(header[0:4], n.Cookie) - util.Uint64toBytes(header[4:12], n.Id) - n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) - if n.DataSize > 0 { - n.Size = 4 + n.DataSize + 1 - if n.HasName() { - n.Size = n.Size + 1 + uint32(n.NameSize) - } - if n.HasMime() { - n.Size = n.Size + 1 + uint32(n.MimeSize) - } - } - size = n.DataSize - util.Uint32toBytes(header[12:16], n.Size) - if _, err = w.Write(header); err != nil { - return - } - if n.DataSize > 0 { - util.Uint32toBytes(header[0:4], n.DataSize) - if _, err = w.Write(header[0:4]); err != nil { - return - } - if _, err = w.Write(n.Data); err != nil { - return - } - util.Uint8toBytes(header[0:1], n.Flags) - if _, err = w.Write(header[0:1]); err != nil { - return - } - } - if n.HasName() { - util.Uint8toBytes(header[0:1], n.NameSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Name); err != nil { - return - } - } - if n.HasMime() { - util.Uint8toBytes(header[0:1], n.MimeSize) - if _, err = w.Write(header[0:1]); err != nil { - return - } - if _, err = w.Write(n.Mime); err != nil { - return - } - } - padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) - util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - _, err = w.Write(header[0 : NeedleChecksumSize+padding]) - return n.DataSize, err - } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) -} - -func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) { - switch version { - case Version1: - bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) - if ret, err = r.Read(bytes); err != nil { - return - } - n.readNeedleHeader(bytes) - n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) - if checksum != NewCRC(n.Data).Value() { - return 0, errors.New("CRC error! Data On Disk Corrupted!") - } - return - case Version2: - if size == 0 { - return 0, nil - } - bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) - if ret, err = r.Read(bytes); err != nil { - return - } - if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) { - return 0, errors.New("File Entry Not Found!") - } - n.readNeedleHeader(bytes) - if n.Size != size { - return 0, fmt.Errorf("File Entry Not Found! Needle %d Memory %d", n.Size, size) - } - n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) - checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize]) - if checksum != NewCRC(n.Data).Value() { - return 0, errors.New("CRC error! Data On Disk Corrupted!") - } - return - } - return 0, fmt.Errorf("Unsupported Version! (%d)", version) -} -func (n *Needle) readNeedleHeader(bytes []byte) { - n.Cookie = util.BytesToUint32(bytes[0:4]) - n.Id = util.BytesToUint64(bytes[4:12]) - n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) -} -func (n *Needle) readNeedleDataVersion2(bytes []byte) { - index, lenBytes := 0, len(bytes) - if index < lenBytes { - n.DataSize = util.BytesToUint32(bytes[index : index+4]) - index = index + 4 - n.Data = bytes[index : index+int(n.DataSize)] - index = index + int(n.DataSize) - n.Flags = bytes[index] - index = index + 1 - } - if index < lenBytes && n.HasName() { - n.NameSize = uint8(bytes[index]) - index = index + 1 - n.Name = bytes[index : index+int(n.NameSize)] - index = index + int(n.NameSize) - } - if index < lenBytes && n.HasMime() { - n.MimeSize = uint8(bytes[index]) - index = index + 1 - n.Mime = bytes[index : index+int(n.MimeSize)] - } -} - -func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) { - n = new(Needle) - if version == Version1 || version == Version2 { - bytes := make([]byte, NeedleHeaderSize) - var count int - count, err = r.Read(bytes) - if count <= 0 || err != nil { - return nil, 0, err - } - n.readNeedleHeader(bytes) - padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) - bodyLength = n.Size + NeedleChecksumSize + padding - } - return -} - -//n should be a needle already read the header -//the input stream will read until next file entry -func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) { - if bodyLength <= 0 { - return nil - } - switch version { - case Version1: - bytes := make([]byte, bodyLength) - if _, err = r.Read(bytes); err != nil { - return - } - n.Data = bytes[:n.Size] - n.Checksum = NewCRC(n.Data) - case Version2: - bytes := make([]byte, bodyLength) - if _, err = r.Read(bytes); err != nil { - return - } - n.readNeedleDataVersion2(bytes[0:n.Size]) - n.Checksum = NewCRC(n.Data) - default: - err = fmt.Errorf("Unsupported Version! (%d)", version) - } - return -} - -func (n *Needle) IsGzipped() bool { - return n.Flags&FlagGzip > 0 -} -func (n *Needle) SetGzipped() { - n.Flags = n.Flags | FlagGzip -} -func (n *Needle) HasName() bool { - return n.Flags&FlagHasName > 0 -} -func (n *Needle) SetHasName() { - n.Flags = n.Flags | FlagHasName -} -func (n *Needle) HasMime() bool { - return n.Flags&FlagHasMime > 0 -} -func (n *Needle) SetHasMime() { - n.Flags = n.Flags | FlagHasMime -} diff --git a/weed/storage/replication_type.go b/weed/storage/replication_type.go deleted file mode 100644 index 0902d1016..000000000 --- a/weed/storage/replication_type.go +++ /dev/null @@ -1,123 +0,0 @@ -package storage - -import ( - "errors" -) - -type ReplicationType string - -const ( - Copy000 = ReplicationType("000") // single copy - Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center - Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center - Copy100 = ReplicationType("100") // 2 copies, each on different data center - Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center - Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center - LengthRelicationType = 6 - CopyNil = ReplicationType(255) // nil value -) - -func NewReplicationTypeFromString(t string) (ReplicationType, error) { - switch t { - case "000": - return Copy000, nil - case "001": - return Copy001, nil - case "010": - return Copy010, nil - case "100": - return Copy100, nil - case "110": - return Copy110, nil - case "200": - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:" + t) -} -func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { - switch b { - case byte(000): - return Copy000, nil - case byte(001): - return Copy001, nil - case byte(010): - return Copy010, nil - case byte(100): - return Copy100, nil - case byte(110): - return Copy110, nil - case byte(200): - return Copy200, nil - } - return Copy000, errors.New("Unknown Replication Type:" + string(b)) -} - -func (r *ReplicationType) String() string { - switch *r { - case Copy000: - return "000" - case Copy001: - return "001" - case Copy010: - return "010" - case Copy100: - return "100" - case Copy110: - return "110" - case Copy200: - return "200" - } - return "000" -} -func (r *ReplicationType) Byte() byte { - switch *r { - case Copy000: - return byte(000) - case Copy001: - return byte(001) - case Copy010: - return byte(010) - case Copy100: - return byte(100) - case Copy110: - return byte(110) - case Copy200: - return byte(200) - } - return byte(000) -} - -func (repType ReplicationType) GetReplicationLevelIndex() int { - switch repType { - case Copy000: - return 0 - case Copy001: - return 1 - case Copy010: - return 2 - case Copy100: - return 3 - case Copy110: - return 4 - case Copy200: - return 5 - } - return -1 -} -func (repType ReplicationType) GetCopyCount() int { - switch repType { - case Copy000: - return 1 - case Copy001: - return 2 - case Copy010: - return 2 - case Copy100: - return 2 - case Copy110: - return 3 - case Copy200: - return 3 - } - return 0 -} diff --git a/weed/storage/sample.idx b/weed/storage/sample.idx Binary files differdeleted file mode 100644 index 44918b41d..000000000 --- a/weed/storage/sample.idx +++ /dev/null diff --git a/weed/storage/store.go b/weed/storage/store.go deleted file mode 100644 index dc9304e7d..000000000 --- a/weed/storage/store.go +++ /dev/null @@ -1,204 +0,0 @@ -package storage - -import ( - "encoding/json" - "errors" - "io/ioutil" - "log" - "net/url" - "code.google.com/p/weed-fs/weed/util" - "strconv" - "strings" -) - -type Store struct { - volumes map[VolumeId]*Volume - dir string - Port int - Ip string - PublicUrl string - MaxVolumeCount int - - masterNode string - connected bool - volumeSizeLimit uint64 //read from the master - -} - -func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { - s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount} - s.volumes = make(map[VolumeId]*Volume) - s.loadExistingVolumes() - - log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") - return -} -func (s *Store) AddVolume(volumeListString string, replicationType string) error { - rt, e := NewReplicationTypeFromString(replicationType) - if e != nil { - return e - } - for _, range_string := range strings.Split(volumeListString, ",") { - if strings.Index(range_string, "-") < 0 { - id_string := range_string - id, err := NewVolumeId(id_string) - if err != nil { - return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") - } - e = s.addVolume(VolumeId(id), rt) - } else { - pair := strings.Split(range_string, "-") - start, start_err := strconv.ParseUint(pair[0], 10, 64) - if start_err != nil { - return errors.New("Volume Start Id" + pair[0] + " is not a valid unsigned integer!") - } - end, end_err := strconv.ParseUint(pair[1], 10, 64) - if end_err != nil { - return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") - } - for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), rt); err != nil { - e = err - } - } - } - } - return e -} -func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { - if s.volumes[vid] != nil { - return errors.New("Volume Id " + vid.String() + " already exists!") - } - log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) - s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) - return err -} - -func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false - } - garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) - if e != nil { - return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false - } - return nil, garbageThreshold < s.volumes[vid].garbageLevel() -} -func (s *Store) CompactVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") - } - return s.volumes[vid].compact() -} -func (s *Store) CommitCompactVolume(volumeIdString string) error { - vid, err := NewVolumeId(volumeIdString) - if err != nil { - return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") - } - return s.volumes[vid].commitCompact() -} -func (s *Store) loadExistingVolumes() { - if dirs, err := ioutil.ReadDir(s.dir); err == nil { - for _, dir := range dirs { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - base := name[:len(name)-len(".dat")] - if vid, err := NewVolumeId(base); err == nil { - if s.volumes[vid] == nil { - if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { - s.volumes[vid] = v - log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) - } - } - } - } - } - } -} -func (s *Store) Status() []*VolumeInfo { - var stats []*VolumeInfo - for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = - VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter - stats = append(stats, s) - } - return stats -} - -type JoinResult struct { - VolumeSizeLimit uint64 -} - -func (s *Store) SetMaster(mserver string) { - s.masterNode = mserver -} -func (s *Store) Join() error { - stats := new([]*VolumeInfo) - for k, v := range s.volumes { - s := new(VolumeInfo) - s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = - VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter - *stats = append(*stats, s) - } - bytes, _ := json.Marshal(stats) - values := make(url.Values) - if !s.connected { - values.Add("init", "true") - } - values.Add("port", strconv.Itoa(s.Port)) - values.Add("ip", s.Ip) - values.Add("publicUrl", s.PublicUrl) - values.Add("volumes", string(bytes)) - values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) - jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) - if err != nil { - return err - } - var ret JoinResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - s.volumeSizeLimit = ret.VolumeSizeLimit - s.connected = true - return nil -} -func (s *Store) Close() { - for _, v := range s.volumes { - v.Close() - } -} -func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { - if v := s.volumes[i]; v != nil { - size, err = v.write(n) - if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { - log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) - s.Join() - } - return - } - log.Println("volume", i, "not found!") - return -} -func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { - if v := s.volumes[i]; v != nil { - return v.delete(n) - } - return 0, nil -} -func (s *Store) Read(i VolumeId, n *Needle) (int, error) { - if v := s.volumes[i]; v != nil { - return v.read(n) - } - return 0, errors.New("Not Found") -} -func (s *Store) GetVolume(i VolumeId) *Volume { - return s.volumes[i] -} - -func (s *Store) HasVolume(i VolumeId) bool { - _, ok := s.volumes[i] - return ok -} diff --git a/weed/storage/volume.go b/weed/storage/volume.go deleted file mode 100644 index 707c6e6f8..000000000 --- a/weed/storage/volume.go +++ /dev/null @@ -1,274 +0,0 @@ -package storage - -import ( - "errors" - "fmt" - "io" - "os" - "path" - "sync" -) - -const ( - SuperBlockSize = 8 -) - -type SuperBlock struct { - Version Version - ReplicaType ReplicationType -} - -func (s *SuperBlock) Bytes() []byte { - header := make([]byte, SuperBlockSize) - header[0] = byte(s.Version) - header[1] = s.ReplicaType.Byte() - return header -} - -type Volume struct { - Id VolumeId - dir string - dataFile *os.File - nm *NeedleMap - - SuperBlock - - accessLock sync.Mutex -} - -func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { - v = &Volume{dir: dirname, Id: id} - v.SuperBlock = SuperBlock{ReplicaType: replicationType} - e = v.load(true) - return -} -func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) { - v = &Volume{dir: dirname, Id: id} - v.SuperBlock = SuperBlock{ReplicaType: CopyNil} - e = v.load(false) - return -} -func (v *Volume) load(alsoLoadIndex bool) error { - var e error - fileName := path.Join(v.dir, v.Id.String()) - v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) - if e != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) - } - if v.ReplicaType == CopyNil { - if e = v.readSuperBlock(); e != nil { - return e - } - } else { - v.maybeWriteSuperBlock() - } - if alsoLoadIndex { - indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if ie != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) - } - v.nm = LoadNeedleMap(indexFile) - } - return nil -} -func (v *Volume) Version() Version { - return v.SuperBlock.Version -} -func (v *Volume) Size() int64 { - v.accessLock.Lock() - defer v.accessLock.Unlock() - stat, e := v.dataFile.Stat() - if e == nil { - return stat.Size() - } - fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) - return -1 -} -func (v *Volume) Close() { - v.accessLock.Lock() - defer v.accessLock.Unlock() - v.nm.Close() - v.dataFile.Close() -} -func (v *Volume) maybeWriteSuperBlock() { - stat, e := v.dataFile.Stat() - if e != nil { - fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e) - return - } - if stat.Size() == 0 { - v.SuperBlock.Version = CurrentVersion - v.dataFile.Write(v.SuperBlock.Bytes()) - } -} -func (v *Volume) readSuperBlock() (err error) { - v.dataFile.Seek(0, 0) - header := make([]byte, SuperBlockSize) - if _, e := v.dataFile.Read(header); e != nil { - return fmt.Errorf("cannot read superblock: %s", e) - } - v.SuperBlock, err = ParseSuperBlock(header) - return err -} -func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { - superBlock.Version = Version(header[0]) - if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err) - } - return -} -func (v *Volume) NeedToReplicate() bool { - return v.ReplicaType.GetCopyCount() > 1 -} - -func (v *Volume) write(n *Needle) (size uint32, err error) { - v.accessLock.Lock() - defer v.accessLock.Unlock() - var offset int64 - if offset, err = v.dataFile.Seek(0, 2); err != nil { - return - } - if size, err = n.Append(v.dataFile, v.Version()); err != nil { - return - } - nv, ok := v.nm.Get(n.Id) - if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { - _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size) - } - return -} -func (v *Volume) delete(n *Needle) (uint32, error) { - v.accessLock.Lock() - defer v.accessLock.Unlock() - nv, ok := v.nm.Get(n.Id) - //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) - if ok { - v.nm.Delete(n.Id) - v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0) - _, err := n.Append(v.dataFile, v.Version()) - return nv.Size, err - } - return 0, nil -} - -func (v *Volume) read(n *Needle) (int, error) { - v.accessLock.Lock() - defer v.accessLock.Unlock() - nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) - return n.Read(v.dataFile, nv.Size, v.Version()) - } - return -1, errors.New("Not Found") -} - -func (v *Volume) garbageLevel() float64 { - return float64(v.nm.deletionByteCounter) / float64(v.ContentSize()) -} - -func (v *Volume) compact() error { - v.accessLock.Lock() - defer v.accessLock.Unlock() - - filePath := path.Join(v.dir, v.Id.String()) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") -} -func (v *Volume) commitCompact() error { - v.accessLock.Lock() - defer v.accessLock.Unlock() - v.dataFile.Close() - var e error - if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { - return e - } - if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil { - return e - } - if e = v.load(true); e != nil { - return e - } - return nil -} - -func ScanVolumeFile(dirname string, id VolumeId, - visitSuperBlock func(SuperBlock) error, - visitNeedle func(n *Needle, offset uint32) error) (err error) { - var v *Volume - if v, err = LoadVolumeOnly(dirname, id); err != nil { - return - } - if err = visitSuperBlock(v.SuperBlock); err != nil { - return - } - - version := v.Version() - - offset := uint32(SuperBlockSize) - n, rest, e := ReadNeedleHeader(v.dataFile, version) - if e != nil { - err = fmt.Errorf("cannot read needle header: %s", e) - return - } - for n != nil { - if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil { - err = fmt.Errorf("cannot read needle body: %s", err) - return - } - if err = visitNeedle(n, offset); err != nil { - return - } - offset += NeedleHeaderSize + rest - if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("cannot read needle header: %s", err) - } - } - - return -} - -func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { - var ( - dst, idx *os.File - ) - if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil { - return - } - defer dst.Close() - - if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil { - return - } - defer idx.Close() - - nm := NewNeedleMap(idx) - new_offset := uint32(SuperBlockSize) - - err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error { - _, err = dst.Write(superBlock.Bytes()) - return err - }, func(n *Needle, offset uint32) error { - nv, ok := v.nm.Get(n.Id) - //log.Println("file size is", n.Size, "rest", rest) - if ok && nv.Offset*NeedlePaddingSize == offset { - if nv.Size > 0 { - if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) - } - if _, err = n.Append(dst, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) - } - new_offset += n.DiskSize() - //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest) - } - } - return nil - }) - - return -} -func (v *Volume) ContentSize() uint64 { - return v.nm.fileByteCounter -} diff --git a/weed/storage/volume_id.go b/weed/storage/volume_id.go deleted file mode 100644 index 0333c6cf0..000000000 --- a/weed/storage/volume_id.go +++ /dev/null @@ -1,18 +0,0 @@ -package storage - -import ( - "strconv" -) - -type VolumeId uint32 - -func NewVolumeId(vid string) (VolumeId, error) { - volumeId, err := strconv.ParseUint(vid, 10, 64) - return VolumeId(volumeId), err -} -func (vid *VolumeId) String() string { - return strconv.FormatUint(uint64(*vid), 10) -} -func (vid *VolumeId) Next() VolumeId { - return VolumeId(uint32(*vid) + 1) -} diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go deleted file mode 100644 index e4c5f6ec4..000000000 --- a/weed/storage/volume_info.go +++ /dev/null @@ -1,13 +0,0 @@ -package storage - -import () - -type VolumeInfo struct { - Id VolumeId - Size uint64 - RepType ReplicationType - Version Version - FileCount int - DeleteCount int - DeletedByteCount uint64 -} diff --git a/weed/storage/volume_version.go b/weed/storage/volume_version.go deleted file mode 100644 index 9702ae904..000000000 --- a/weed/storage/volume_version.go +++ /dev/null @@ -1,11 +0,0 @@ -package storage - -import () - -type Version uint8 - -const ( - Version1 = Version(1) - Version2 = Version(2) - CurrentVersion = Version2 -) diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go deleted file mode 100644 index 4c8424214..000000000 --- a/weed/topology/configuration.go +++ /dev/null @@ -1,56 +0,0 @@ -package topology - -import ( - "encoding/xml" -) - -type loc struct { - dcName string - rackName string -} -type rack struct { - Name string `xml:"name,attr"` - Ips []string `xml:"Ip"` -} -type dataCenter struct { - Name string `xml:"name,attr"` - Racks []rack `xml:"Rack"` -} -type topology struct { - DataCenters []dataCenter `xml:"DataCenter"` -} -type Configuration struct { - XMLName xml.Name `xml:"Configuration"` - Topo topology `xml:"Topology"` - ip2location map[string]loc -} - -func NewConfiguration(b []byte) (*Configuration, error) { - c := &Configuration{} - err := xml.Unmarshal(b, c) - c.ip2location = make(map[string]loc) - for _, dc := range c.Topo.DataCenters { - for _, rack := range dc.Racks { - for _, ip := range rack.Ips { - c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name} - } - } - } - return c, err -} - -func (c *Configuration) String() string { - if b, e := xml.MarshalIndent(c, " ", " "); e == nil { - return string(b) - } - return "" -} - -func (c *Configuration) Locate(ip string) (dc string, rack string) { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName - } - } - return "DefaultDataCenter", "DefaultRack" -} diff --git a/weed/topology/configuration_test.go b/weed/topology/configuration_test.go deleted file mode 100644 index 35d82c058..000000000 --- a/weed/topology/configuration_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package topology - -import ( - "fmt" - "testing" -) - -func TestLoadConfiguration(t *testing.T) { - - confContent := ` - -<?xml version="1.0" encoding="UTF-8" ?> -<Configuration> - <Topology> - <DataCenter name="dc1"> - <Rack name="rack1"> - <Ip>192.168.1.1</Ip> - </Rack> - </DataCenter> - <DataCenter name="dc2"> - <Rack name="rack1"> - <Ip>192.168.1.2</Ip> - </Rack> - <Rack name="rack2"> - <Ip>192.168.1.3</Ip> - <Ip>192.168.1.4</Ip> - </Rack> - </DataCenter> - </Topology> -</Configuration> -` - c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err != nil { - t.Fatalf("unmarshal error:%s", err.Error()) - } - - if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s", c) - } -} diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go deleted file mode 100644 index a3b2b7d13..000000000 --- a/weed/topology/data_center.go +++ /dev/null @@ -1,41 +0,0 @@ -package topology - -import () - -type DataCenter struct { - NodeImpl -} - -func NewDataCenter(id string) *DataCenter { - dc := &DataCenter{} - dc.id = NodeId(id) - dc.nodeType = "DataCenter" - dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc - return dc -} - -func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack -} - -func (dc *DataCenter) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m -} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go deleted file mode 100644 index dbb634af2..000000000 --- a/weed/topology/data_node.go +++ /dev/null @@ -1,60 +0,0 @@ -package topology - -import ( - _ "fmt" - "code.google.com/p/weed-fs/weed/storage" - "strconv" -) - -type DataNode struct { - NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - Dead bool -} - -func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) - s.NodeImpl.value = s - return s -} -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { - if _, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - dn.UpAdjustActiveVolumeCountDelta(1) - dn.UpAdjustMaxVolumeId(v.Id) - } else { - dn.volumes[v.Id] = v - } -} -func (dn *DataNode) GetTopology() *Topology { - p := dn.parent - for p.Parent() != nil { - p = p.Parent() - } - t := p.(*Topology) - return t -} -func (dn *DataNode) MatchLocation(ip string, port int) bool { - return dn.Ip == ip && dn.Port == port -} -func (dn *DataNode) Url() string { - return dn.Ip + ":" + strconv.Itoa(dn.Port) -} - -func (dn *DataNode) ToMap() interface{} { - ret := make(map[string]interface{}) - ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() - ret["PublicUrl"] = dn.PublicUrl - return ret -} diff --git a/weed/topology/node.go b/weed/topology/node.go deleted file mode 100644 index fe69c57c0..000000000 --- a/weed/topology/node.go +++ /dev/null @@ -1,200 +0,0 @@ -package topology - -import ( - "fmt" - "code.google.com/p/weed-fs/weed/storage" -) - -type NodeId string -type Node interface { - Id() NodeId - String() string - FreeSpace() int - ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) - UpAdjustMaxVolumeId(vid storage.VolumeId) - - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int - GetMaxVolumeId() storage.VolumeId - SetParent(Node) - LinkChildNode(node Node) - UnlinkChildNode(nodeId NodeId) - CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) - - IsDataNode() bool - Children() map[NodeId]Node - Parent() Node - - GetValue() interface{} //get reference to the topology,dc,rack,datanode -} -type NodeImpl struct { - id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int - parent Node - children map[NodeId]Node - maxVolumeId storage.VolumeId - - //for rack, data center, topology - nodeType string - value interface{} -} - -func (n *NodeImpl) IsDataNode() bool { - return n.nodeType == "DataNode" -} -func (n *NodeImpl) IsRack() bool { - return n.nodeType == "Rack" -} -func (n *NodeImpl) IsDataCenter() bool { - return n.nodeType == "DataCenter" -} -func (n *NodeImpl) String() string { - if n.parent != nil { - return n.parent.String() + ":" + string(n.id) - } - return string(n.id) -} -func (n *NodeImpl) Id() NodeId { - return n.id -} -func (n *NodeImpl) FreeSpace() int { - return n.maxVolumeCount - n.volumeCount -} -func (n *NodeImpl) SetParent(node Node) { - n.parent = node -} -func (n *NodeImpl) Children() map[NodeId]Node { - return n.children -} -func (n *NodeImpl) Parent() Node { - return n.parent -} -func (n *NodeImpl) GetValue() interface{} { - return n.value -} -func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { - ret := false - var assignedNode *DataNode - for _, node := range n.children { - freeSpace := node.FreeSpace() - //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) - if freeSpace <= 0 { - continue - } - if r >= freeSpace { - r -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) - } - ret, assignedNode = node.ReserveOneVolume(r, vid) - if ret { - break - } - } - } - return ret, assignedNode -} - -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta - if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta - if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative - if n.maxVolumeId < vid { - n.maxVolumeId = vid - if n.parent != nil { - n.parent.UpAdjustMaxVolumeId(vid) - } - } -} -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { - return n.maxVolumeId -} -func (n *NodeImpl) GetVolumeCount() int { - return n.volumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int { - return n.maxVolumeCount -} - -func (n *NodeImpl) LinkChildNode(node Node) { - if n.children[node.Id()] == nil { - n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) - n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) - node.SetParent(n) - fmt.Println(n, "adds child", node.Id()) - } -} - -func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { - node := n.children[nodeId] - node.SetParent(nil) - if node != nil { - delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) - fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) - } -} - -func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { - if n.IsRack() { - for _, c := range n.Children() { - dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen < freshThreshHold { - if !dn.Dead { - dn.Dead = true - n.GetTopology().chanDeadDataNodes <- dn - } - } - for _, v := range dn.volumes { - if uint64(v.Size) >= volumeSizeLimit { - //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- v - } - } - } - } else { - for _, c := range n.Children() { - c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) - } - } -} - -func (n *NodeImpl) GetTopology() *Topology { - var p Node - p = n - for p.Parent() != nil { - p = p.Parent() - } - return p.GetValue().(*Topology) -} diff --git a/weed/topology/node_list.go b/weed/topology/node_list.go deleted file mode 100644 index 597d39b93..000000000 --- a/weed/topology/node_list.go +++ /dev/null @@ -1,69 +0,0 @@ -package topology - -import ( - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" -) - -type NodeList struct { - nodes map[NodeId]Node - except map[string]Node -} - -func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList { - m := make(map[NodeId]Node, len(nodes)-len(except)) - for _, n := range nodes { - if except[n.String()] == nil { - m[n.Id()] = n - } - } - nl := &NodeList{nodes: m} - return nl -} - -func (nl *NodeList) FreeSpace() int { - freeSpace := 0 - for _, n := range nl.nodes { - freeSpace += n.FreeSpace() - } - return freeSpace -} - -func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { - var list []Node - for _, n := range nl.nodes { - if n.FreeSpace() >= min { - list = append(list, n) - } - } - if n > len(list) { - return nil, false - } - for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t - } - return list[len(list)-n:], true -} - -func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { - for _, node := range nl.nodes { - freeSpace := node.FreeSpace() - if randomVolumeIndex >= freeSpace { - randomVolumeIndex -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) - } - children := node.Children() - newNodeList := NewNodeList(children, nl.except) - return newNodeList.ReserveOneVolume(randomVolumeIndex, vid) - } - } - return false, nil - -} diff --git a/weed/topology/node_list_test.go b/weed/topology/node_list_test.go deleted file mode 100644 index 2fb4fa970..000000000 --- a/weed/topology/node_list_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package topology - -import ( - _ "fmt" - "strconv" - "testing" -) - -func TestXYZ(t *testing.T) { - topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) - for i := 0; i < 5; i++ { - dc := NewDataCenter("dc" + strconv.Itoa(i)) - dc.activeVolumeCount = i - dc.maxVolumeCount = 5 - topo.LinkChildNode(dc) - } - nl := NewNodeList(topo.Children(), nil) - - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked) != 1 { - t.Errorf("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked) != 4 { - t.Errorf("need to randomly pick 4 nodes") - } - - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked) != 5 { - t.Errorf("need to randomly pick 5 nodes") - } - - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked) != 0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } - -} diff --git a/weed/topology/rack.go b/weed/topology/rack.go deleted file mode 100644 index acc34417a..000000000 --- a/weed/topology/rack.go +++ /dev/null @@ -1,64 +0,0 @@ -package topology - -import ( - "strconv" - "time" -) - -type Rack struct { - NodeImpl -} - -func NewRack(id string) *Rack { - r := &Rack{} - r.id = NodeId(id) - r.nodeType = "Rack" - r.children = make(map[NodeId]Node) - r.NodeImpl.value = r - return r -} - -func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil -} -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) - } - return dn - } - } - dn := NewDataNode(ip + ":" + strconv.Itoa(port)) - dn.Ip = ip - dn.Port = port - dn.PublicUrl = publicUrl - dn.maxVolumeCount = maxVolumeCount - dn.LastSeen = time.Now().Unix() - r.LinkChildNode(dn) - return dn -} - -func (rack *Rack) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = rack.GetMaxVolumeCount() - m["Free"] = rack.FreeSpace() - var dns []interface{} - for _, c := range rack.Children() { - dn := c.(*DataNode) - dns = append(dns, dn.ToMap()) - } - m["DataNodes"] = dns - return m -} diff --git a/weed/topology/topo_test.go b/weed/topology/topo_test.go deleted file mode 100644 index 99db15b5c..000000000 --- a/weed/topology/topo_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package topology - -import ( - "encoding/json" - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" - "testing" - "time" -) - -var topologyLayout = ` -{ - "dc1":{ - "rack1":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":2, "size":12312}, - {"id":3, "size":12312} - ], - "limit":3 - }, - "server2":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":10 - } - }, - "rack2":{ - "server1":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":4 - }, - "server2":{ - "volumes":[], - "limit":4 - }, - "server3":{ - "volumes":[ - {"id":2, "size":12312}, - {"id":3, "size":12312}, - {"id":4, "size":12312} - ], - "limit":2 - } - } - }, - "dc2":{ - }, - "dc3":{ - "rack2":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":3, "size":12312}, - {"id":5, "size":12312} - ], - "limit":4 - } - } - } -} -` - -func setup(topologyLayout string) *Topology { - var data interface{} - err := json.Unmarshal([]byte(topologyLayout), &data) - if err != nil { - fmt.Println("error:", err) - } - - //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) - mTopology := data.(map[string]interface{}) - for dcKey, dcValue := range mTopology { - dc := NewDataCenter(dcKey) - dcMap := dcValue.(map[string]interface{}) - topo.LinkChildNode(dc) - for rackKey, rackValue := range dcMap { - rack := NewRack(rackKey) - rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) - for serverKey, serverValue := range rackMap { - server := NewDataNode(serverKey) - serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) - for _, v := range serverMap["volumes"].([]interface{}) { - m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) - } - } - } - - return topo -} - -func TestRemoveDataCenter(t *testing.T) { - topo := setup(topologyLayout) - topo.UnlinkChildNode(NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { - t.Fail() - } - topo.UnlinkChildNode(NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { - t.Fail() - } -} - -func TestReserveOneVolume(t *testing.T) { - topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) - ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) - -} diff --git a/weed/topology/topology.go b/weed/topology/topology.go deleted file mode 100644 index ac5505a66..000000000 --- a/weed/topology/topology.go +++ /dev/null @@ -1,148 +0,0 @@ -package topology - -import ( - "errors" - "io/ioutil" - "math/rand" - "code.google.com/p/weed-fs/weed/directory" - "code.google.com/p/weed-fs/weed/sequence" - "code.google.com/p/weed-fs/weed/storage" -) - -type Topology struct { - NodeImpl - - //transient vid~servers mapping for each replication type - replicaType2VolumeLayout []*VolumeLayout - - pulse int64 - - volumeSizeLimit uint64 - - sequence sequence.Sequencer - - chanDeadDataNodes chan *DataNode - chanRecoveredDataNodes chan *DataNode - chanFullVolumes chan storage.VolumeInfo - - configuration *Configuration -} - -func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology { - t := &Topology{} - t.id = NodeId(id) - t.nodeType = "Topology" - t.NodeImpl.value = t - t.children = make(map[NodeId]Node) - t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) - t.pulse = int64(pulse) - t.volumeSizeLimit = volumeSizeLimit - - t.sequence = sequence.NewSequencer(dirname, sequenceFilename) - - t.chanDeadDataNodes = make(chan *DataNode) - t.chanRecoveredDataNodes = make(chan *DataNode) - t.chanFullVolumes = make(chan storage.VolumeInfo) - - t.loadConfiguration(confFile) - - return t -} - -func (t *Topology) loadConfiguration(configurationFile string) error { - b, e := ioutil.ReadFile(configurationFile) - if e == nil { - t.configuration, e = NewConfiguration(b) - } - return e -} - -func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - if list := vl.Lookup(vid); list != nil { - return list - } - } - } - return nil -} - -func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { - if t.FreeSpace() <= 0 { - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) - return ret, node, &vid -} - -func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) { - freeSpace := t.FreeSpace() - for _, node := range except { - freeSpace -= node.FreeSpace() - } - if freeSpace <= 0 { - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) - return ret, node, &vid -} - -func (t *Topology) NextVolumeId() storage.VolumeId { - vid := t.GetMaxVolumeId() - return vid.Next() -} - -func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) - } - vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) - if err != nil { - return "", 0, nil, errors.New("No writable volumes avalable!") - } - fileId, count := t.sequence.NextFileId(count) - return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil -} - -func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) - } - return t.replicaType2VolumeLayout[replicationTypeIndex] -} - -func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) -} - -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { - dcName, rackName := t.configuration.Locate(ip) - dc := t.GetOrCreateDataCenter(dcName) - rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(ip, port) - if init && dn != nil { - t.UnRegisterDataNode(dn) - } - dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) - for _, v := range volumeInfos { - dn.AddOrUpdateVolume(v) - t.RegisterVolumeLayout(&v, dn) - } -} - -func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { - for _, c := range t.Children() { - dc := c.(*DataCenter) - if string(dc.Id()) == dcName { - return dc - } - } - dc := NewDataCenter(dcName) - t.LinkChildNode(dc) - return dc -} diff --git a/weed/topology/topology_compact.go b/weed/topology/topology_compact.go deleted file mode 100644 index 980f72a6e..000000000 --- a/weed/topology/topology_compact.go +++ /dev/null @@ -1,150 +0,0 @@ -package topology - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/util" - "time" -) - -func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { - ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url()) - if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { - //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e) - ch <- false - } else { - //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) - ch <- ret - } - }(index, dn.Url(), vid) - } - isCheckSuccess := true - for _ = range locationlist.list { - select { - case canVacuum := <-ch: - isCheckSuccess = isCheckSuccess && canVacuum - case <-time.After(30 * time.Minute): - isCheckSuccess = false - break - } - } - return isCheckSuccess -} -func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - vl.removeFromWritable(vid) - ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - fmt.Println(index, "Start vacuuming", vid, "on", dn.Url()) - if e := vacuumVolume_Compact(url, vid); e != nil { - fmt.Println(index, "Error when vacuuming", vid, "on", url, e) - ch <- false - } else { - fmt.Println(index, "Complete vacuuming", vid, "on", url) - ch <- true - } - }(index, dn.Url(), vid) - } - isVacuumSuccess := true - for _ = range locationlist.list { - select { - case _ = <-ch: - case <-time.After(30 * time.Minute): - isVacuumSuccess = false - break - } - } - return isVacuumSuccess -} -func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - isCommitSuccess := true - for _, dn := range locationlist.list { - fmt.Println("Start Commiting vacuum", vid, "on", dn.Url()) - if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { - fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e) - isCommitSuccess = false - } else { - fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url()) - } - } - if isCommitSuccess { - vl.setVolumeWritable(vid) - } - return isCommitSuccess -} -func (t *Topology) Vacuum(garbageThreshold string) int { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) - } - } - } - } - } - return 0 -} - -type VacuumVolumeResult struct { - Result bool - Error string -} - -func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) - if err != nil { - fmt.Println("parameters:", values) - return err, false - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err, false - } - if ret.Error != "" { - return errors.New(ret.Error), false - } - return nil, ret.Result -} -func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { - values := make(url.Values) - values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values) - if err != nil { - return err - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} -func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { - values := make(url.Values) - values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values) - if err != nil { - return err - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go deleted file mode 100644 index 084dd5b57..000000000 --- a/weed/topology/topology_event_handling.go +++ /dev/null @@ -1,67 +0,0 @@ -package topology - -import ( - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" - "time" -) - -func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { - go func() { - for { - freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval - t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) - time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - go func(garbageThreshold string) { - c := time.Tick(15 * time.Minute) - for _ = range c { - t.Vacuum(garbageThreshold) - } - }(garbageThreshold) - go func() { - for { - select { - case v := <-t.chanFullVolumes: - t.SetVolumeCapacityFull(v) - case dn := <-t.chanRecoveredDataNodes: - t.RegisterRecoveredDataNode(dn) - fmt.Println("DataNode", dn, "is back alive!") - case dn := <-t.chanDeadDataNodes: - t.UnRegisterDataNode(dn) - fmt.Println("DataNode", dn, "is dead!") - } - } - }() -} -func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.RepType) - if !vl.SetVolumeCapacityFull(volumeInfo.Id) { - return false - } - for _, dn := range vl.vid2location[volumeInfo.Id].list { - dn.UpAdjustActiveVolumeCountDelta(-1) - } - return true -} -func (t *Topology) UnRegisterDataNode(dn *DataNode) { - for _, v := range dn.volumes { - fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.RepType) - vl.SetVolumeUnavailable(dn, v.Id) - } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) - dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) - dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) - dn.Parent().UnlinkChildNode(dn.Id()) -} -func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) - if vl.isWritable(&v) { - vl.SetVolumeAvailable(dn, v.Id) - } - } -} diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go deleted file mode 100644 index b416ee943..000000000 --- a/weed/topology/topology_map.go +++ /dev/null @@ -1,50 +0,0 @@ -package topology - -import () - -func (t *Topology) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() - var dcs []interface{} - for _, c := range t.Children() { - dc := c.(*DataCenter) - dcs = append(dcs, dc.ToMap()) - } - m["DataCenters"] = dcs - var layouts []interface{} - for _, layout := range t.replicaType2VolumeLayout { - if layout != nil { - layouts = append(layouts, layout.ToMap()) - } - } - m["layouts"] = layouts - return m -} - -func (t *Topology) ToVolumeMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() - dcs := make(map[NodeId]interface{}) - for _, c := range t.Children() { - dc := c.(*DataCenter) - racks := make(map[NodeId]interface{}) - for _, r := range dc.Children() { - rack := r.(*Rack) - dataNodes := make(map[NodeId]interface{}) - for _, d := range rack.Children() { - dn := d.(*DataNode) - var volumes []interface{} - for _, v := range dn.volumes { - volumes = append(volumes, v) - } - dataNodes[d.Id()] = volumes - } - racks[r.Id()] = dataNodes - } - dcs[dc.Id()] = racks - } - m["DataCenters"] = dcs - return m -} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go deleted file mode 100644 index 494630d9f..000000000 --- a/weed/topology/volume_layout.go +++ /dev/null @@ -1,116 +0,0 @@ -package topology - -import ( - "errors" - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" -) - -type VolumeLayout struct { - repType storage.ReplicationType - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - pulse int64 - volumeSizeLimit uint64 -} - -func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { - return &VolumeLayout{ - repType: repType, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - pulse: pulse, - volumeSizeLimit: volumeSizeLimit, - } -} - -func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - if _, ok := vl.vid2location[v.Id]; !ok { - vl.vid2location[v.Id] = NewVolumeLocationList() - } - if vl.vid2location[v.Id].Add(dn) { - if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { - if vl.isWritable(v) { - vl.writables = append(vl.writables, v.Id) - } - } - } -} - -func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion -} - -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { - return vl.vid2location[vid].list -} - -func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { - len_writers := len(vl.writables) - if len_writers <= 0 { - fmt.Println("No more writable volumes!") - return nil, 0, nil, errors.New("No more writable volumes!") - } - vid := vl.writables[rand.Intn(len_writers)] - locationList := vl.vid2location[vid] - if locationList != nil { - return &vid, count, locationList, nil - } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") -} - -func (vl *VolumeLayout) GetActiveVolumeCount() int { - return len(vl.writables) -} - -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { - for i, v := range vl.writables { - if v == vid { - fmt.Println("Volume", vid, "becomes unwritable") - vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) - return true - } - } - return false -} -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { - for _, v := range vl.writables { - if v == vid { - return false - } - } - fmt.Println("Volume", vid, "becomes writable") - vl.writables = append(vl.writables, vid) - return true -} - -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { - if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { - fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) - return vl.removeFromWritable(vid) - } - } - return false -} -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { - if vl.vid2location[vid].Add(dn) { - if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { - return vl.setVolumeWritable(vid) - } - } - return false -} - -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { - return vl.removeFromWritable(vid) -} - -func (vl *VolumeLayout) ToMap() interface{} { - m := make(map[string]interface{}) - m["replication"] = vl.repType.String() - m["writables"] = vl.writables - //m["locations"] = vl.vid2location - return m -} diff --git a/weed/topology/volume_location.go b/weed/topology/volume_location.go deleted file mode 100644 index 507a240b5..000000000 --- a/weed/topology/volume_location.go +++ /dev/null @@ -1,58 +0,0 @@ -package topology - -import () - -type VolumeLocationList struct { - list []*DataNode -} - -func NewVolumeLocationList() *VolumeLocationList { - return &VolumeLocationList{} -} - -func (dnll *VolumeLocationList) Head() *DataNode { - return dnll.list[0] -} - -func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) -} - -func (dnll *VolumeLocationList) Add(loc *DataNode) bool { - for _, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - return false - } - } - dnll.list = append(dnll.list, loc) - return true -} - -func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) - return true - } - } - return false -} - -func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { - var changed bool - for _, dnl := range dnll.list { - if dnl.LastSeen < freshThreshHold { - changed = true - break - } - } - if changed { - var l []*DataNode - for _, dnl := range dnll.list { - if dnl.LastSeen >= freshThreshHold { - l = append(l, dnl) - } - } - dnll.list = l - } -} diff --git a/weed/upload.go b/weed/upload.go deleted file mode 100644 index e1e296bf2..000000000 --- a/weed/upload.go +++ /dev/null @@ -1,113 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "os" - "path" - "code.google.com/p/weed-fs/weed/operation" - "code.google.com/p/weed-fs/weed/util" - "strconv" -) - -var uploadReplication *string - -func init() { - cmdUpload.Run = runUpload // break init cycle - cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") - server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") - uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)") -} - -var cmdUpload = &Command{ - UsageLine: "upload -server=localhost:9333 file1 [file2 file3]", - Short: "upload one or a list of files", - Long: `upload one or a list of files. - It uses consecutive file keys for the list of files. - e.g. If the file1 uses key k, file2 can be read via k_1 - - `, -} - -type AssignResult struct { - Fid string "fid" - Url string "url" - PublicUrl string "publicUrl" - Count int - Error string "error" -} - -func assign(count int) (*AssignResult, error) { - values := make(url.Values) - values.Add("count", strconv.Itoa(count)) - values.Add("replication", *uploadReplication) - jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) - debug("assign result :", string(jsonBlob)) - if err != nil { - return nil, err - } - var ret AssignResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, err - } - if ret.Count <= 0 { - return nil, errors.New(ret.Error) - } - return &ret, nil -} - -func upload(filename string, server string, fid string) (int, error) { - debug("Start uploading file:", filename) - fh, err := os.Open(filename) - if err != nil { - debug("Failed to open file:", filename) - return 0, err - } - ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh) - if e != nil { - return 0, e - } - return ret.Size, e -} - -type SubmitResult struct { - Fid string "fid" - Size int "size" - Error string "error" -} - -func submit(files []string) []SubmitResult { - ret, err := assign(len(files)) - if err != nil { - fmt.Println(err) - return nil - } - results := make([]SubmitResult, len(files)) - for index, file := range files { - fid := ret.Fid - if index > 0 { - fid = fid + "_" + strconv.Itoa(index) - } - results[index].Size, err = upload(file, ret.PublicUrl, fid) - if err != nil { - fid = "" - results[index].Error = err.Error() - } - results[index].Fid = fid - } - return results -} - -func runUpload(cmd *Command, args []string) bool { - *IsDebug = true - if len(cmdUpload.Flag.Args()) == 0 { - return false - } - results := submit(args) - bytes, _ := json.Marshal(results) - fmt.Print(string(bytes)) - return true -} diff --git a/weed/util/bytes.go b/weed/util/bytes.go deleted file mode 100644 index 6cc3d7018..000000000 --- a/weed/util/bytes.go +++ /dev/null @@ -1,33 +0,0 @@ -package util - -func BytesToUint64(b []byte) (v uint64) { - length := uint(len(b)) - for i := uint(0); i < length-1; i++ { - v += uint64(b[i]) - v <<= 8 - } - v += uint64(b[length-1]) - return -} -func BytesToUint32(b []byte) (v uint32) { - length := uint(len(b)) - for i := uint(0); i < length-1; i++ { - v += uint32(b[i]) - v <<= 8 - } - v += uint32(b[length-1]) - return -} -func Uint64toBytes(b []byte, v uint64) { - for i := uint(0); i < 8; i++ { - b[7-i] = byte(v >> (i * 8)) - } -} -func Uint32toBytes(b []byte, v uint32) { - for i := uint(0); i < 4; i++ { - b[3-i] = byte(v >> (i * 8)) - } -} -func Uint8toBytes(b []byte, v uint8) { - b[0] = byte(v) -} diff --git a/weed/util/config.go b/weed/util/config.go deleted file mode 100644 index 6ac8a3a65..000000000 --- a/weed/util/config.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2011 Numerotron Inc. -// Use of this source code is governed by an MIT-style license -// that can be found in the LICENSE file. -// -// Developed at www.stathat.com by Patrick Crosby -// Contact us on twitter with any questions: twitter.com/stat_hat - -// The jconfig package provides a simple, basic configuration file parser using JSON. -package util - -import ( - "bytes" - "encoding/json" - "log" - "os" -) - -type Config struct { - data map[string]interface{} - filename string -} - -func newConfig() *Config { - result := new(Config) - result.data = make(map[string]interface{}) - return result -} - -// Loads config information from a JSON file -func LoadConfig(filename string) *Config { - result := newConfig() - result.filename = filename - err := result.parse() - if err != nil { - log.Fatalf("error loading config file %s: %s", filename, err) - } - return result -} - -// Loads config information from a JSON string -func LoadConfigString(s string) *Config { - result := newConfig() - err := json.Unmarshal([]byte(s), &result.data) - if err != nil { - log.Fatalf("error parsing config string %s: %s", s, err) - } - return result -} - -func (c *Config) StringMerge(s string) { - next := LoadConfigString(s) - c.merge(next.data) -} - -func (c *Config) LoadMerge(filename string) { - next := LoadConfig(filename) - c.merge(next.data) -} - -func (c *Config) merge(ndata map[string]interface{}) { - for k, v := range ndata { - c.data[k] = v - } -} - -func (c *Config) parse() error { - f, err := os.Open(c.filename) - if err != nil { - return err - } - defer f.Close() - b := new(bytes.Buffer) - _, err = b.ReadFrom(f) - if err != nil { - return err - } - err = json.Unmarshal(b.Bytes(), &c.data) - if err != nil { - return err - } - - return nil -} - -// Returns a string for the config variable key -func (c *Config) GetString(key string) string { - result, present := c.data[key] - if !present { - return "" - } - return result.(string) -} - -// Returns an int for the config variable key -func (c *Config) GetInt(key string) int { - x, ok := c.data[key] - if !ok { - return -1 - } - return int(x.(float64)) -} - -// Returns a float for the config variable key -func (c *Config) GetFloat(key string) float64 { - x, ok := c.data[key] - if !ok { - return -1 - } - return x.(float64) -} - -// Returns a bool for the config variable key -func (c *Config) GetBool(key string) bool { - x, ok := c.data[key] - if !ok { - return false - } - return x.(bool) -} - -// Returns an array for the config variable key -func (c *Config) GetArray(key string) []interface{} { - result, present := c.data[key] - if !present { - return []interface{}(nil) - } - return result.([]interface{}) -} diff --git a/weed/util/parse.go b/weed/util/parse.go deleted file mode 100644 index 930da9522..000000000 --- a/weed/util/parse.go +++ /dev/null @@ -1,16 +0,0 @@ -package util - -import ( - "strconv" -) - -func ParseInt(text string, defaultValue int) int { - count, parseError := strconv.ParseUint(text, 10, 64) - if parseError != nil { - if len(text) > 0 { - return 0 - } - return defaultValue - } - return int(count) -} diff --git a/weed/util/post.go b/weed/util/post.go deleted file mode 100644 index 6e6ab0003..000000000 --- a/weed/util/post.go +++ /dev/null @@ -1,23 +0,0 @@ -package util - -import ( - "io/ioutil" - "log" - "net/http" - "net/url" -) - -func Post(url string, values url.Values) ([]byte, error) { - r, err := http.PostForm(url, values) - if err != nil { - log.Println("post to", url, err) - return nil, err - } - defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Println("read post result from", url, err) - return nil, err - } - return b, nil -} diff --git a/weed/version.go b/weed/version.go deleted file mode 100644 index b418126a4..000000000 --- a/weed/version.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "fmt" - "runtime" -) - -const ( - VERSION = "0.28 beta" -) - -var cmdVersion = &Command{ - Run: runVersion, - UsageLine: "version", - Short: "print Weed File System version", - Long: `Version prints the Weed File System version`, -} - -func runVersion(cmd *Command, args []string) bool { - if len(args) != 0 { - cmd.Usage() - } - - fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) - return true -} diff --git a/weed/volume.go b/weed/volume.go deleted file mode 100644 index 8bfc7681a..000000000 --- a/weed/volume.go +++ /dev/null @@ -1,378 +0,0 @@ -package main - -import ( - "bytes" - "log" - "math/rand" - "mime" - "net/http" - "os" - "code.google.com/p/weed-fs/weed/operation" - "code.google.com/p/weed-fs/weed/storage" - "runtime" - "strconv" - "strings" - "time" -) - -func init() { - cmdVolume.Run = runVolume // break init cycle - cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdVolume = &Command{ - UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333", - Short: "start a volume server", - Long: `start a volume server to provide storage spaces - - `, -} - -var ( - vport = cmdVolume.Flag.Int("port", 8080, "http listen port") - volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files") - ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name") - publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>") - masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") - vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") - maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes") - vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - - store *storage.Store -) - -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") - -func statusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = store.Status() - writeJson(w, r, m) -} -func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) - if err == nil { - writeJson(w, r, map[string]string{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } - debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) -} -func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { - err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) - if err == nil { - writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) - } else { - writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) - } - debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) -} -func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { - err := store.CompactVolume(r.FormValue("volume")) - if err == nil { - writeJson(w, r, map[string]string{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } - debug("compacted volume =", r.FormValue("volume"), ", error =", err) -} -func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { - err := store.CommitCompactVolume(r.FormValue("volume")) - if err == nil { - writeJson(w, r, map[string]interface{}{"error": ""}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } - debug("commit compact volume =", r.FormValue("volume"), ", error =", err) -} -func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - GetHandler(w, r) - case "DELETE": - DeleteHandler(w, r) - case "POST": - PostHandler(w, r) - } -} -func GetHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, ext := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - n.ParsePath(fid) - - debug("volume", volumeId, "reading", n) - if !store.HasVolume(volumeId) { - lookupResult, err := operation.Lookup(*masterNode, volumeId) - debug("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil { - http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - debug("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - } - return - } - cookie := n.Cookie - count, e := store.Read(volumeId, n) - debug("read bytes", count, "error", e) - if e != nil || count <= 0 { - debug("read error:", e, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - if n.Cookie != cookie { - log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(http.StatusNotFound) - return - } - if n.NameSize > 0 { - fname := string(n.Name) - dotIndex := strings.LastIndex(fname, ".") - if dotIndex > 0 { - ext = fname[dotIndex:] - } - } - mtype := "" - if ext != "" { - mtype = mime.TypeByExtension(ext) - } - if n.MimeSize > 0 { - mtype = string(n.Mime) - } - if mtype != "" { - w.Header().Set("Content-Type", mtype) - } - if n.NameSize > 0 { - w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name))) - } - if ext != ".gz" { - if n.IsGzipped() { - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - w.Header().Set("Content-Encoding", "gzip") - } else { - if n.Data, err = storage.UnGzipData(n.Data); err != nil { - debug("lookup error:", err, r.URL.Path) - } - } - } - } - w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) - w.Write(n.Data) -} -func PostHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, e := storage.NewVolumeId(vid) - if e != nil { - writeJson(w, r, e) - } else { - needle, filename, ne := storage.NewNeedle(r) - if ne != nil { - writeJson(w, r, ne) - } else { - ret, err := store.Write(volumeId, needle) - errorStatus := "" - needToReplicate := !store.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } - } - } - m := make(map[string]interface{}) - if errorStatus == "" { - w.WriteHeader(http.StatusCreated) - } else { - store.Delete(volumeId, needle) - distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) - w.WriteHeader(http.StatusInternalServerError) - m["error"] = errorStatus - } - m["size"] = ret - writeJson(w, r, m) - } - } -} -func DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(fid) - - debug("deleting", n) - - cookie := n.Cookie - count, ok := store.Read(volumeId, n) - - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJson(w, r, m) - return - } - - if n.Cookie != cookie { - log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 - ret, err := store.Delete(volumeId, n) - if err != nil { - log.Printf("delete error: %s\n", err) - return - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) { - ret = 0 - } - } - } - - if ret != 0 { - w.WriteHeader(http.StatusAccepted) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJson(w, r, m) -} - -func parseURLPath(path string) (vid, fid, ext string) { - - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - if "favicon.ico" != path[sepIndex+1:] { - log.Println("unknown file id", path[sepIndex+1:]) - } - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - return -} - -func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { - length := 0 - selfUrl := (*ip + ":" + strconv.Itoa(*vport)) - results := make(chan bool) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan bool) { - results <- op(location) - }(location, results) - } - } - ret := true - for i := 0; i < length; i++ { - ret = ret && <-results - } - return ret - } else { - log.Println("Failed to lookup for", volumeId, lookupErr.Error()) - } - return false -} - -func runVolume(cmd *Command, args []string) bool { - if *vMaxCpu < 1 { - *vMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*vMaxCpu) - fileInfo, err := os.Stat(*volumeFolder) - if err != nil { - log.Fatalf("No Existing Folder:%s", *volumeFolder) - } - if !fileInfo.IsDir() { - log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) - } - perm := fileInfo.Mode().Perm() - log.Println("Volume Folder permission:", perm) - - if *publicUrl == "" { - *publicUrl = *ip + ":" + strconv.Itoa(*vport) - } - - store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) - defer store.Close() - http.HandleFunc("/", storeHandler) - http.HandleFunc("/status", statusHandler) - http.HandleFunc("/admin/assign_volume", assignVolumeHandler) - http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler) - http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler) - http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler) - - go func() { - connected := true - store.SetMaster(*masterNode) - for { - err := store.Join() - if err == nil { - if !connected { - connected = true - log.Println("Reconnected with master") - } - } else { - if connected { - connected = false - } - } - time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - log.Println("store joined at", *masterNode) - - log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) - srv := &http.Server{ - Addr: ":" + strconv.Itoa(*vport), - Handler: http.DefaultServeMux, - ReadTimeout: (time.Duration(*vReadTimeout) * time.Second), - } - e := srv.ListenAndServe() - if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) - } - return true -} diff --git a/weed/weed.go b/weed/weed.go deleted file mode 100644 index c03cb68ac..000000000 --- a/weed/weed.go +++ /dev/null @@ -1,199 +0,0 @@ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "io" - "math/rand" - "net/http" - "os" - "strings" - "sync" - "text/template" - "time" - "unicode" - "unicode/utf8" -) - -var IsDebug *bool -var server *string - -var commands = []*Command{ - cmdFix, - cmdMaster, - cmdUpload, - cmdShell, - cmdVersion, - cmdVolume, - cmdExport, -} - -var exitStatus = 0 -var exitMu sync.Mutex - -func setExitStatus(n int) { - exitMu.Lock() - if exitStatus < n { - exitStatus = n - } - exitMu.Unlock() -} - -func main() { - rand.Seed(time.Now().UnixNano()) - flag.Usage = usage - flag.Parse() - - args := flag.Args() - if len(args) < 1 { - usage() - } - - if args[0] == "help" { - help(args[1:]) - for _, cmd := range commands { - if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil { - fmt.Fprintf(os.Stderr, "Default Parameters:\n") - cmd.Flag.PrintDefaults() - } - } - return - } - - for _, cmd := range commands { - if cmd.Name() == args[0] && cmd.Run != nil { - cmd.Flag.Usage = func() { cmd.Usage() } - cmd.Flag.Parse(args[1:]) - args = cmd.Flag.Args() - IsDebug = cmd.IsDebug - if !cmd.Run(cmd, args) { - fmt.Fprintf(os.Stderr, "\n") - cmd.Flag.Usage() - fmt.Fprintf(os.Stderr, "Default Parameters:\n") - cmd.Flag.PrintDefaults() - } - exit() - return - } - } - - fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0]) - setExitStatus(2) - exit() -} - -var usageTemplate = `WeedFS is a software to store billions of files and serve them fast! - -Usage: - - weed command [arguments] - -The commands are: -{{range .}}{{if .Runnable}} - {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}} - -Use "weed help [command]" for more information about a command. - -` - -var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}} -{{end}} - {{.Long}} -` - -// tmpl executes the given template text on data, writing the result to w. -func tmpl(w io.Writer, text string, data interface{}) { - t := template.New("top") - t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize}) - template.Must(t.Parse(text)) - if err := t.Execute(w, data); err != nil { - panic(err) - } -} - -func capitalize(s string) string { - if s == "" { - return s - } - r, n := utf8.DecodeRuneInString(s) - return string(unicode.ToTitle(r)) + s[n:] -} - -func printUsage(w io.Writer) { - tmpl(w, usageTemplate, commands) -} - -func usage() { - printUsage(os.Stderr) - os.Exit(2) -} - -// help implements the 'help' command. -func help(args []string) { - if len(args) == 0 { - printUsage(os.Stdout) - // not exit 2: succeeded at 'weed help'. - return - } - if len(args) != 1 { - fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n") - os.Exit(2) // failed at 'weed help' - } - - arg := args[0] - - for _, cmd := range commands { - if cmd.Name() == arg { - tmpl(os.Stdout, helpTemplate, cmd) - // not exit 2: succeeded at 'weed help cmd'. - return - } - } - - fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg) - os.Exit(2) // failed at 'weed help cmd' -} - -var atexitFuncs []func() - -func atexit(f func()) { - atexitFuncs = append(atexitFuncs, f) -} - -func exit() { - for _, f := range atexitFuncs { - f() - } - os.Exit(exitStatus) -} - -func exitIfErrors() { - if exitStatus != 0 { - exit() - } -} -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { - w.Header().Set("Content-Type", "application/javascript") - var bytes []byte - if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") - } else { - bytes, _ = json.Marshal(obj) - } - callback := r.FormValue("callback") - if callback == "" { - w.Write(bytes) - } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) - fmt.Fprint(w, string(bytes)) - w.Write([]uint8(")")) - } -} - -func debug(params ...interface{}) { - if *IsDebug { - fmt.Println(params) - } -} |
