diff options
Diffstat (limited to 'src')
51 files changed, 4799 insertions, 0 deletions
diff --git a/src/weed/command.go b/src/weed/command.go new file mode 100644 index 000000000..c8d86ca66 --- /dev/null +++ b/src/weed/command.go @@ -0,0 +1,54 @@ +package main + +import ( + "flag" + "fmt" + "os" + "strings" +) + +type Command struct { + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool + + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string + + // Short is the short description shown in the 'go help' output. + Short string + + // Long is the long message shown in the 'go help <this-command>' output. + Long string + + // Flag is a set of flags specific to this command. + Flag flag.FlagSet + + IsDebug *bool +} + +// Name returns the command's name: the first word in the usage line. +func (c *Command) Name() string { + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name +} + +func (c *Command) Usage() { + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) +} + +// Runnable reports whether the command can be run; otherwise +// it is a documentation pseudo-command such as importpath. +func (c *Command) Runnable() bool { + return c.Run != nil +} diff --git a/src/weed/directory/file_id.go b/src/weed/directory/file_id.go new file mode 100644 index 000000000..b70f97fd4 --- /dev/null +++ b/src/weed/directory/file_id.go @@ -0,0 +1,38 @@ +package directory + +import ( + "encoding/hex" + "weed/storage" + "weed/util" + "strings" +) + +type FileId struct { + VolumeId storage.VolumeId + Key uint64 + Hashcode uint32 +} + +func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId { + return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} +} +func ParseFileId(fid string) *FileId { + a := strings.Split(fid, ",") + if len(a) != 2 { + println("Invalid fid", fid, ", split length", len(a)) + return nil + } + vid_string, key_hash_string := a[0], a[1] + volumeId, _ := storage.NewVolumeId(vid_string) + key, hash := storage.ParseKeyHash(key_hash_string) + return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash} +} +func (n *FileId) String() string { + bytes := make([]byte, 12) + util.Uint64toBytes(bytes[0:8], n.Key) + util.Uint32toBytes(bytes[8:12], n.Hashcode) + nonzero_index := 0 + for ; bytes[nonzero_index] == 0; nonzero_index++ { + } + return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:]) +} diff --git a/src/weed/export.go b/src/weed/export.go new file mode 100644 index 000000000..3c31b6ab7 --- /dev/null +++ b/src/weed/export.go @@ -0,0 +1,164 @@ +package main + +import ( + "archive/tar" + "bytes" + "fmt" + "log" + "os" + "path" + "weed/directory" + "weed/storage" + "strconv" + "strings" + "text/template" + "time" +) + +func init() { + cmdExport.Run = runExport // break init cycle + cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode") +} + +const ( + defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}` +) + +var cmdExport = &Command{ + UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}", + Short: "list or export files from one volume data file", + Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified. + + The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}. + + `, +} + +var ( + exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files") + exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") + dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") + format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}") + tarFh *tar.Writer + tarHeader tar.Header + fnTmpl *template.Template + fnTmplBuf = bytes.NewBuffer(nil) +) + +func runExport(cmd *Command, args []string) bool { + + if *exportVolumeId == -1 { + return false + } + + var err error + if *dest != "" { + if *dest != "-" && !strings.HasSuffix(*dest, ".tar") { + fmt.Println("the output file", *dest, "should be '-' or end with .tar") + return false + } + + if fnTmpl, err = template.New("name").Parse(*format); err != nil { + fmt.Println("cannot parse format " + *format + ": " + err.Error()) + return false + } + + var fh *os.File + if *dest == "-" { + fh = os.Stdout + } else { + if fh, err = os.Create(*dest); err != nil { + log.Fatalf("cannot open output tar %s: %s", *dest, err) + } + } + defer fh.Close() + tarFh = tar.NewWriter(fh) + defer tarFh.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + fileName := strconv.Itoa(*exportVolumeId) + vid := storage.VolumeId(*exportVolumeId) + indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644) + if err != nil { + log.Fatalf("Create Volume Index [ERROR] %s\n", err) + } + defer indexFile.Close() + + nm := storage.LoadNeedleMap(indexFile) + + var version storage.Version + + err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error { + version = superBlock.Version + return nil + }, func(n *storage.Needle, offset uint32) error { + debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) + nv, ok := nm.Get(n.Id) + if ok && nv.Size > 0 { + return walker(vid, n, version) + } else { + if !ok { + debug("This seems deleted", n.Id) + } else { + debug("Id", n.Id, "size", n.Size) + } + } + return nil + }) + if err != nil { + log.Fatalf("Export Volume File [ERROR] %s\n", err) + } + return true +} + +type nameParams struct { + Name string + Id uint64 + Mime string + Key string +} + +func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) { + key := directory.NewFileId(vid, n.Id, n.Cookie).String() + if tarFh != nil { + fnTmplBuf.Reset() + if err = fnTmpl.Execute(fnTmplBuf, + nameParams{Name: string(n.Name), + Id: n.Id, + Mime: string(n.Mime), + Key: key, + }, + ); err != nil { + return err + } + nm := fnTmplBuf.String() + + if n.IsGzipped() && path.Ext(nm) != ".gz" { + nm = nm + ".gz" + } + + tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data)) + if err = tarFh.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarFh.Write(n.Data) + } else { + size := n.DataSize + if version == storage.Version1 { + size = n.Size + } + fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n", + key, + n.Name, + size, + n.IsGzipped(), + n.Mime, + ) + } + return +} diff --git a/src/weed/fix.go b/src/weed/fix.go new file mode 100644 index 000000000..0a21965b7 --- /dev/null +++ b/src/weed/fix.go @@ -0,0 +1,64 @@ +package main + +import ( + "log" + "os" + "path" + "weed/storage" + "strconv" +) + +func init() { + cmdFix.Run = runFix // break init cycle + cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdFix = &Command{ + UsageLine: "fix -dir=/tmp -volumeId=234", + Short: "run weed tool fix on index file if corrupted", + Long: `Fix runs the WeedFS fix command to re-create the index .idx file. + + `, +} + +var ( + fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files") + fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") +) + +func runFix(cmd *Command, args []string) bool { + + if *fixVolumeId == -1 { + return false + } + + fileName := strconv.Itoa(*fixVolumeId) + indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + log.Fatalf("Create Volume Index [ERROR] %s\n", err) + } + defer indexFile.Close() + + nm := storage.NewNeedleMap(indexFile) + defer nm.Close() + + vid := storage.VolumeId(*fixVolumeId) + err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error { + return nil + }, func(n *storage.Needle, offset uint32) error { + debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped()) + if n.Size > 0 { + count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size) + debug("saved", count, "with error", pe) + } else { + debug("skipping deleted file ...") + nm.Delete(n.Id) + } + return nil + }) + if err != nil { + log.Fatalf("Export Volume File [ERROR] %s\n", err) + } + + return true +} diff --git a/src/weed/master.go b/src/weed/master.go new file mode 100644 index 000000000..a06c8a99a --- /dev/null +++ b/src/weed/master.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "weed/replication" + "weed/storage" + "weed/topology" + "runtime" + "strconv" + "strings" + "time" +) + +func init() { + cmdMaster.Run = runMaster // break init cycle + cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdMaster = &Command{ + UsageLine: "master -port=9333", + Short: "start a master server", + Long: `start a master server to provide volume=>location mapping service + and sequence number of file ids + + `, +} + +var ( + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") + mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") + defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") + mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") +) + +var topo *topology.Topology +var vg *replication.VolumeGrowth + +func dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, err := storage.NewVolumeId(vid) + if err == nil { + machines := topo.Lookup(volumeId) + if machines != nil { + ret := []map[string]string{} + for _, dn := range machines { + ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) + } + writeJson(w, r, map[string]interface{}{"locations": ret}) + } else { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + } +} + +func dirAssignHandler(w http.ResponseWriter, r *http.Request) { + c, e := strconv.Atoi(r.FormValue("count")) + if e != nil { + c = 1 + } + repType := r.FormValue("replication") + if repType == "" { + repType = *defaultRepType + } + rt, err := storage.NewReplicationTypeFromString(repType) + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + return + } + if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { + if topo.FreeSpace() <= 0 { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "No free volumes left!"}) + return + } else { + vg.GrowByType(rt, topo) + } + } + fid, count, dn, err := topo.PickForWrite(rt, c) + if err == nil { + writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + } +} + +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + init := r.FormValue("init") == "true" + ip := r.FormValue("ip") + if ip == "" { + ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + } + port, _ := strconv.Atoi(r.FormValue("port")) + maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + debug(s, "volumes", r.FormValue("volumes")) + topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) + m := make(map[string]interface{}) + m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 + writeJson(w, r, m) +} + +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Topology"] = topo.ToMap() + writeJson(w, r, m) +} + +func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { + gcThreshold := r.FormValue("garbageThreshold") + if gcThreshold == "" { + gcThreshold = *garbageThreshold + } + debug("garbageThreshold =", gcThreshold) + topo.Vacuum(gcThreshold) + dirStatusHandler(w, r) +} + +func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + count := 0 + rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if topo.FreeSpace() < count*rt.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + } else { + count, err = vg.GrowByCountAndType(count, rt, topo) + } + } else { + err = errors.New("parameter count is not found") + } + } + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]interface{}{"count": count}) + } +} + +func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = topo.ToVolumeMap() + writeJson(w, r, m) +} + +func redirectHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + machines := topo.Lookup(volumeId) + if machines != nil && len(machines) > 0 { + http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } +} + +func runMaster(cmd *Command, args []string) bool { + if *mMaxCpu < 1 { + *mMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*mMaxCpu) + topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) + vg = replication.NewDefaultVolumeGrowth() + log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") + http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/lookup", dirLookupHandler) + http.HandleFunc("/dir/join", dirJoinHandler) + http.HandleFunc("/dir/status", dirStatusHandler) + http.HandleFunc("/vol/grow", volumeGrowHandler) + http.HandleFunc("/vol/status", volumeStatusHandler) + http.HandleFunc("/vol/vacuum", volumeVacuumHandler) + + http.HandleFunc("/", redirectHandler) + + topo.StartRefreshWritableVolumes(*garbageThreshold) + + log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) + srv := &http.Server{ + Addr: ":" + strconv.Itoa(*mport), + Handler: http.DefaultServeMux, + ReadTimeout: time.Duration(*mReadTimeout) * time.Second, + } + e := srv.ListenAndServe() + if e != nil { + log.Fatalf("Fail to start:%s", e.Error()) + } + return true +} diff --git a/src/weed/operation/allocate_volume.go b/src/weed/operation/allocate_volume.go new file mode 100644 index 000000000..1e016abf7 --- /dev/null +++ b/src/weed/operation/allocate_volume.go @@ -0,0 +1,32 @@ +package operation + +import ( + "encoding/json" + "errors" + "net/url" + "weed/storage" + "weed/topology" + "weed/util" +) + +type AllocateVolumeResult struct { + Error string +} + +func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("replicationType", repType.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/src/weed/operation/delete_content.go b/src/weed/operation/delete_content.go new file mode 100644 index 000000000..2bdb49651 --- /dev/null +++ b/src/weed/operation/delete_content.go @@ -0,0 +1,16 @@ +package operation + +import ( + "log" + "net/http" +) + +func Delete(url string) error { + req, err := http.NewRequest("DELETE", url, nil) + if err != nil { + log.Println("failing to delete", url) + return err + } + _, err = http.DefaultClient.Do(req) + return err +} diff --git a/src/weed/operation/lookup_volume_id.go b/src/weed/operation/lookup_volume_id.go new file mode 100644 index 000000000..ccfca67c8 --- /dev/null +++ b/src/weed/operation/lookup_volume_id.go @@ -0,0 +1,38 @@ +package operation + +import ( + "encoding/json" + "errors" + _ "fmt" + "net/url" + "weed/storage" + "weed/util" +) + +type Location struct { + Url string "url" + PublicUrl string "publicUrl" +} +type LookupResult struct { + Locations []Location "locations" + Error string "error" +} + +//TODO: Add a caching for vid here +func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { + values := make(url.Values) + values.Add("volumeId", vid.String()) + jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) + if err != nil { + return nil, err + } + var ret LookupResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil +} diff --git a/src/weed/operation/upload_content.go b/src/weed/operation/upload_content.go new file mode 100644 index 000000000..0bdb697da --- /dev/null +++ b/src/weed/operation/upload_content.go @@ -0,0 +1,47 @@ +package operation + +import ( + "bytes" + "encoding/json" + "errors" + _ "fmt" + "io" + "io/ioutil" + "log" + "mime/multipart" + "net/http" +) + +type UploadResult struct { + Size int + Error string +} + +func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) { + body_buf := bytes.NewBufferString("") + body_writer := multipart.NewWriter(body_buf) + file_writer, err := body_writer.CreateFormFile("file", filename) + io.Copy(file_writer, reader) + content_type := body_writer.FormDataContentType() + body_writer.Close() + resp, err := http.Post(uploadUrl, content_type, body_buf) + if err != nil { + log.Println("failing to upload to", uploadUrl) + return nil, err + } + defer resp.Body.Close() + resp_body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var ret UploadResult + err = json.Unmarshal(resp_body, &ret) + if err != nil { + log.Println("failing to read upload resonse", uploadUrl, resp_body) + return nil, err + } + if ret.Error != "" { + return nil, errors.New(ret.Error) + } + return &ret, nil +} diff --git a/src/weed/replication/volume_growth.go b/src/weed/replication/volume_growth.go new file mode 100644 index 000000000..0aae05bb3 --- /dev/null +++ b/src/weed/replication/volume_growth.go @@ -0,0 +1,195 @@ +package replication + +import ( + "errors" + "fmt" + "math/rand" + "weed/operation" + "weed/storage" + "weed/topology" + "sync" +) + +/* +This package is created to resolve these replica placement issues: +1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies +2. in time of tight storage, how to reduce replica level +3. optimizing for hot data on faster disk, cold data on cheaper storage, +4. volume allocation for each bucket +*/ + +type VolumeGrowth struct { + copy1factor int + copy2factor int + copy3factor int + copyAll int + + accessLock sync.Mutex +} + +func NewDefaultVolumeGrowth() *VolumeGrowth { + return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3} +} + +func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) { + switch repType { + case storage.Copy000: + return vg.GrowByCountAndType(vg.copy1factor, repType, topo) + case storage.Copy001: + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy010: + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy100: + return vg.GrowByCountAndType(vg.copy2factor, repType, topo) + case storage.Copy110: + return vg.GrowByCountAndType(vg.copy3factor, repType, topo) + case storage.Copy200: + return vg.GrowByCountAndType(vg.copy3factor, repType, topo) + } + return 0, errors.New("Unknown Replication Type!") +} +func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) { + vg.accessLock.Lock() + defer vg.accessLock.Unlock() + + counter = 0 + switch repType { + case storage.Copy000: + for i := 0; i < count; i++ { + if ok, server, vid := topo.RandomlyReserveOneVolume(); ok { + if err = vg.grow(topo, *vid, repType, server); err == nil { + counter++ + } + } + } + case storage.Copy001: + for i := 0; i < count; i++ { + //randomly pick one server, and then choose from the same rack + if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { + rack := server1.Parent() + exclusion := make(map[string]topology.Node) + exclusion[server1.String()] = server1 + newNodeList := topology.NewNodeList(rack.Children(), exclusion) + if newNodeList.FreeSpace() > 0 { + if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { + if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { + counter++ + } + } + } + } + } + case storage.Copy010: + for i := 0; i < count; i++ { + //randomly pick one server, and then choose from the same rack + if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok { + rack := server1.Parent() + dc := rack.Parent() + exclusion := make(map[string]topology.Node) + exclusion[rack.String()] = rack + newNodeList := topology.NewNodeList(dc.Children(), exclusion) + if newNodeList.FreeSpace() > 0 { + if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 { + if err = vg.grow(topo, *vid, repType, server1, server2); err == nil { + counter++ + } + } + } + } + } + case storage.Copy100: + for i := 0; i < count; i++ { + nl := topology.NewNodeList(topo.Children(), nil) + picked, ret := nl.RandomlyPickN(2, 1) + vid := topo.NextVolumeId() + if ret { + var servers []*topology.DataNode + for _, n := range picked { + if n.FreeSpace() > 0 { + if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { + servers = append(servers, server) + } + } + } + if len(servers) == 2 { + if err = vg.grow(topo, vid, repType, servers...); err == nil { + counter++ + } + } + } + } + case storage.Copy110: + for i := 0; i < count; i++ { + nl := topology.NewNodeList(topo.Children(), nil) + picked, ret := nl.RandomlyPickN(2, 2) + vid := topo.NextVolumeId() + if ret { + var servers []*topology.DataNode + dc1, dc2 := picked[0], picked[1] + if dc2.FreeSpace() > dc1.FreeSpace() { + dc1, dc2 = dc2, dc1 + } + if dc1.FreeSpace() > 0 { + if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok { + servers = append(servers, server1) + rack := server1.Parent() + exclusion := make(map[string]topology.Node) + exclusion[rack.String()] = rack + newNodeList := topology.NewNodeList(dc1.Children(), exclusion) + if newNodeList.FreeSpace() > 0 { + if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 { + servers = append(servers, server2) + } + } + } + } + if dc2.FreeSpace() > 0 { + if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok { + servers = append(servers, server) + } + } + if len(servers) == 3 { + if err = vg.grow(topo, vid, repType, servers...); err == nil { + counter++ + } + } + } + } + case storage.Copy200: + for i := 0; i < count; i++ { + nl := topology.NewNodeList(topo.Children(), nil) + picked, ret := nl.RandomlyPickN(3, 1) + vid := topo.NextVolumeId() + if ret { + var servers []*topology.DataNode + for _, n := range picked { + if n.FreeSpace() > 0 { + if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok { + servers = append(servers, server) + } + } + } + if len(servers) == 3 { + if err = vg.grow(topo, vid, repType, servers...); err == nil { + counter++ + } + } + } + } + } + return +} +func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error { + for _, server := range servers { + if err := operation.AllocateVolume(server, vid, repType); err == nil { + vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + topo.RegisterVolumeLayout(&vi, server) + fmt.Println("Created Volume", vid, "on", server) + } else { + fmt.Println("Failed to assign", vid, "to", servers) + return errors.New("Failed to assign " + vid.String()) + } + } + return nil +} diff --git a/src/weed/replication/volume_growth_test.go b/src/weed/replication/volume_growth_test.go new file mode 100644 index 000000000..ed7467785 --- /dev/null +++ b/src/weed/replication/volume_growth_test.go @@ -0,0 +1,129 @@ +package replication + +import ( + "encoding/json" + "fmt" + "math/rand" + "weed/storage" + "weed/topology" + "testing" + "time" +) + +var topologyLayout = ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":3 + }, + "server2":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":10 + } + }, + "rack2":{ + "server1":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":4 + }, + "server2":{ + "volumes":[], + "limit":4 + }, + "server3":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":2 + } + } + }, + "dc2":{ + }, + "dc3":{ + "rack2":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":3, "size":12312}, + {"id":5, "size":12312} + ], + "limit":4 + } + } + } +} +` + +func setup(topologyLayout string) *topology.Topology { + var data interface{} + err := json.Unmarshal([]byte(topologyLayout), &data) + if err != nil { + fmt.Println("error:", err) + } + fmt.Println("data:", data) + + //need to connect all nodes first before server adding volumes + topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5) + mTopology := data.(map[string]interface{}) + for dcKey, dcValue := range mTopology { + dc := topology.NewDataCenter(dcKey) + dcMap := dcValue.(map[string]interface{}) + topo.LinkChildNode(dc) + for rackKey, rackValue := range dcMap { + rack := topology.NewRack(rackKey) + rackMap := rackValue.(map[string]interface{}) + dc.LinkChildNode(rack) + for serverKey, serverValue := range rackMap { + server := topology.NewDataNode(serverKey) + serverMap := serverValue.(map[string]interface{}) + rack.LinkChildNode(server) + for _, v := range serverMap["volumes"].([]interface{}) { + m := v.(map[string]interface{}) + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + } + server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + } + } + } + + return topo +} + +func TestRemoveDataCenter(t *testing.T) { + topo := setup(topologyLayout) + topo.UnlinkChildNode(topology.NodeId("dc2")) + if topo.GetActiveVolumeCount() != 15 { + t.Fail() + } + topo.UnlinkChildNode(topology.NodeId("dc3")) + if topo.GetActiveVolumeCount() != 12 { + t.Fail() + } +} + +func TestReserveOneVolume(t *testing.T) { + topo := setup(topologyLayout) + rand.Seed(time.Now().UnixNano()) + vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4} + if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil { + t.Log("reserved", c) + } +} diff --git a/src/weed/sequence/sequence.go b/src/weed/sequence/sequence.go new file mode 100644 index 000000000..c85289468 --- /dev/null +++ b/src/weed/sequence/sequence.go @@ -0,0 +1,71 @@ +package sequence + +import ( + "encoding/gob" + "log" + "os" + "path" + "sync" +) + +const ( + FileIdSaveInterval = 10000 +) + +type Sequencer interface { + NextFileId(count int) (uint64, int) +} +type SequencerImpl struct { + dir string + fileName string + + volumeLock sync.Mutex + sequenceLock sync.Mutex + + FileIdSequence uint64 + fileIdCounter uint64 +} + +func NewSequencer(dirname string, filename string) (m *SequencerImpl) { + m = &SequencerImpl{dir: dirname, fileName: filename} + + seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644) + if se != nil { + m.FileIdSequence = FileIdSaveInterval + log.Println("Setting file id sequence", m.FileIdSequence) + } else { + decoder := gob.NewDecoder(seqFile) + defer seqFile.Close() + decoder.Decode(&m.FileIdSequence) + log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval) + //in case the server stops between intervals + m.FileIdSequence += FileIdSaveInterval + } + return +} + +//count should be 1 or more +func (m *SequencerImpl) NextFileId(count int) (uint64, int) { + if count <= 0 { + return 0, 0 + } + m.sequenceLock.Lock() + defer m.sequenceLock.Unlock() + if m.fileIdCounter < uint64(count) { + m.fileIdCounter = FileIdSaveInterval + m.FileIdSequence += FileIdSaveInterval + m.saveSequence() + } + m.fileIdCounter = m.fileIdCounter - uint64(count) + return m.FileIdSequence - m.fileIdCounter, count +} +func (m *SequencerImpl) saveSequence() { + log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq")) + seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644) + if e != nil { + log.Fatalf("Sequence File Save [ERROR] %s\n", e) + } + defer seqFile.Close() + encoder := gob.NewEncoder(seqFile) + encoder.Encode(m.FileIdSequence) +} diff --git a/src/weed/shell.go b/src/weed/shell.go new file mode 100644 index 000000000..daf0b7e1f --- /dev/null +++ b/src/weed/shell.go @@ -0,0 +1,53 @@ +package main + +import ( + "bufio" + "fmt" + "os" +) + +func init() { + cmdShell.Run = runShell // break init cycle +} + +var cmdShell = &Command{ + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. + + `, +} + +var () + +func runShell(command *Command, args []string) bool { + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + o.WriteString("> ") + o.Flush() + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + o.WriteString(cmd) + } + return 0 + } + + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } + return true +} diff --git a/src/weed/storage/compact_map.go b/src/weed/storage/compact_map.go new file mode 100644 index 000000000..0b33961c4 --- /dev/null +++ b/src/weed/storage/compact_map.go @@ -0,0 +1,182 @@ +package storage + +import () + +type NeedleValue struct { + Key Key + Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 "Size of the data portion" +} + +const ( + batch = 100000 +) + +type Key uint64 + +type CompactSection struct { + values []NeedleValue + overflow map[Key]NeedleValue + start Key + end Key + counter int +} + +func NewCompactSection(start Key) CompactSection { + return CompactSection{ + values: make([]NeedleValue, batch), + overflow: make(map[Key]NeedleValue), + start: start, + } +} + +//return old entry size +func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 { + ret := uint32(0) + if key > cs.end { + cs.end = key + } + if i := cs.binarySearchValues(key); i >= 0 { + ret = cs.values[i].Size + //println("key", key, "old size", ret) + cs.values[i].Offset, cs.values[i].Size = offset, size + } else { + needOverflow := cs.counter >= batch + needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key + if needOverflow { + //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValue, found := cs.overflow[key]; found { + ret = oldValue.Size + } + cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size} + } else { + p := &cs.values[cs.counter] + p.Key, p.Offset, p.Size = key, offset, size + //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) + cs.counter++ + } + } + return ret +} + +//return old entry size +func (cs *CompactSection) Delete(key Key) uint32 { + ret := uint32(0) + if i := cs.binarySearchValues(key); i >= 0 { + if cs.values[i].Size > 0 { + ret = cs.values[i].Size + cs.values[i].Size = 0 + } + } + if v, found := cs.overflow[key]; found { + delete(cs.overflow, key) + ret = v.Size + } + return ret +} +func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) { + if v, ok := cs.overflow[key]; ok { + return &v, true + } + if i := cs.binarySearchValues(key); i >= 0 { + return &cs.values[i], true + } + return nil, false +} +func (cs *CompactSection) binarySearchValues(key Key) int { + l, h := 0, cs.counter-1 + if h >= 0 && cs.values[h].Key < key { + return -2 + } + //println("looking for key", key) + for l <= h { + m := (l + h) / 2 + //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size) + if cs.values[m].Key < key { + l = m + 1 + } else if key < cs.values[m].Key { + h = m - 1 + } else { + //println("found", m) + return m + } + } + return -1 +} + +//This map assumes mostly inserting increasing keys +type CompactMap struct { + list []CompactSection +} + +func NewCompactMap() CompactMap { + return CompactMap{} +} + +func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + //println(x, "creating", len(cm.list), "section1, starting", key) + cm.list = append(cm.list, NewCompactSection(key)) + x = len(cm.list) - 1 + } + return cm.list[x].Set(key, offset, size) +} +func (cm *CompactMap) Delete(key Key) uint32 { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return uint32(0) + } + return cm.list[x].Delete(key) +} +func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) { + x := cm.binarySearchCompactSection(key) + if x < 0 { + return nil, false + } + return cm.list[x].Get(key) +} +func (cm *CompactMap) binarySearchCompactSection(key Key) int { + l, h := 0, len(cm.list)-1 + if h < 0 { + return -5 + } + if cm.list[h].start <= key { + if cm.list[h].counter < batch || key <= cm.list[h].end { + return h + } else { + return -4 + } + } + for l <= h { + m := (l + h) / 2 + if key < cm.list[m].start { + h = m - 1 + } else { // cm.list[m].start <= key + if cm.list[m+1].start <= key { + l = m + 1 + } else { + return m + } + } + } + return -3 +} + +func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { + for _, cs := range cm.list { + for _, v := range cs.overflow { + if err := visit(v); err != nil { + return err + } + } + for _, v := range cs.values { + if _, found := cs.overflow[v.Key]; !found { + if err := visit(v); err != nil { + return err + } + } + } + } + return nil +} diff --git a/src/weed/storage/compact_map_perf_test.go b/src/weed/storage/compact_map_perf_test.go new file mode 100644 index 000000000..b99356a73 --- /dev/null +++ b/src/weed/storage/compact_map_perf_test.go @@ -0,0 +1,43 @@ +package storage + +import ( + "log" + "os" + "weed/util" + "testing" +) + +func TestMemoryUsage(t *testing.T) { + + indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + LoadNewNeedleMap(indexFile) + +} + +func LoadNewNeedleMap(file *os.File) CompactMap { + m := NewCompactMap() + bytes := make([]byte, 16*1024) + count, e := file.Read(bytes) + if count > 0 { + fstat, _ := file.Stat() + log.Println("Loading index file", fstat.Name(), "size", fstat.Size()) + } + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + if offset > 0 { + m.Set(Key(key), offset, size) + } else { + //delete(m, key) + } + } + + count, e = file.Read(bytes) + } + return m +} diff --git a/src/weed/storage/compact_map_test.go b/src/weed/storage/compact_map_test.go new file mode 100644 index 000000000..e76e9578d --- /dev/null +++ b/src/weed/storage/compact_map_test.go @@ -0,0 +1,63 @@ +package storage + +import ( + "testing" +) + +func TestXYZ(t *testing.T) { + m := NewCompactMap() + for i := uint32(0); i < 100*batch; i += 2 { + m.Set(Key(i), i, i) + } + + for i := uint32(0); i < 100*batch; i += 37 { + m.Delete(Key(i)) + } + + for i := uint32(0); i < 10*batch; i += 3 { + m.Set(Key(i), i+11, i+5) + } + + // for i := uint32(0); i < 100; i++ { + // if v := m.Get(Key(i)); v != nil { + // println(i, "=", v.Key, v.Offset, v.Size) + // } + // } + + for i := uint32(0); i < 10*batch; i++ { + v, ok := m.Get(Key(i)) + if i%3 == 0 { + if !ok { + t.Fatal("key", i, "missing!") + } + if v.Size != i+5 { + t.Fatal("key", i, "size", v.Size) + } + } else if i%37 == 0 { + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v.Size != i { + t.Fatal("key", i, "size", v.Size) + } + } + } + + for i := uint32(10 * batch); i < 100*batch; i++ { + v, ok := m.Get(Key(i)) + if i%37 == 0 { + if ok && v.Size > 0 { + t.Fatal("key", i, "should have been deleted needle value", v) + } + } else if i%2 == 0 { + if v == nil { + t.Fatal("key", i, "missing") + } + if v.Size != i { + t.Fatal("key", i, "size", v.Size) + } + } + } + +} diff --git a/src/weed/storage/compress.go b/src/weed/storage/compress.go new file mode 100644 index 000000000..256789c9c --- /dev/null +++ b/src/weed/storage/compress.go @@ -0,0 +1,57 @@ +package storage + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "io/ioutil" + "strings" +) + +/* +* Default more not to gzip since gzip can be done on client side. +*/ +func IsGzippable(ext, mtype string) bool { + if strings.HasPrefix(mtype, "text/") { + return true + } + switch ext { + case ".zip", ".rar", ".gz", ".bz2", ".xz": + return false + case ".pdf", ".txt", ".html", ".css", ".js", ".json": + return true + } + if strings.HasPrefix(mtype, "application/") { + if strings.HasSuffix(mtype, "xml") { + return true + } + if strings.HasSuffix(mtype, "script") { + return true + } + } + return false +} + +func GzipData(input []byte) ([]byte, error) { + buf := new(bytes.Buffer) + w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) + if _, err := w.Write(input); err != nil { + println("error compressing data:", err) + return nil, err + } + if err := w.Close(); err != nil { + println("error closing compressed data:", err) + return nil, err + } + return buf.Bytes(), nil +} +func UnGzipData(input []byte) ([]byte, error) { + buf := bytes.NewBuffer(input) + r, _ := gzip.NewReader(buf) + defer r.Close() + output, err := ioutil.ReadAll(r) + if err != nil { + println("error uncompressing data:", err) + } + return output, err +} diff --git a/src/weed/storage/crc.go b/src/weed/storage/crc.go new file mode 100644 index 000000000..198352e68 --- /dev/null +++ b/src/weed/storage/crc.go @@ -0,0 +1,21 @@ +package storage + +import ( + "hash/crc32" +) + +var table = crc32.MakeTable(crc32.Castagnoli) + +type CRC uint32 + +func NewCRC(b []byte) CRC { + return CRC(0).Update(b) +} + +func (c CRC) Update(b []byte) CRC { + return CRC(crc32.Update(uint32(c), table, b)) +} + +func (c CRC) Value() uint32 { + return uint32(c>>15|c<<17) + 0xa282ead8 +} diff --git a/src/weed/storage/needle.go b/src/weed/storage/needle.go new file mode 100644 index 000000000..23016f98b --- /dev/null +++ b/src/weed/storage/needle.go @@ -0,0 +1,132 @@ +package storage + +import ( + "encoding/hex" + "fmt" + "io/ioutil" + "mime" + "net/http" + "path" + "weed/util" + "strconv" + "strings" +) + +const ( + NeedleHeaderSize = 16 //should never change this + NeedlePaddingSize = 8 + NeedleChecksumSize = 4 +) + +type Needle struct { + Cookie uint32 "random number to mitigate brute force lookups" + Id uint64 "needle id" + Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime" + + DataSize uint32 "Data size" //version2 + Data []byte "The actual file data" + Flags byte "boolean flags" //version2 + NameSize uint8 //version2 + Name []byte "maximum 256 characters" //version2 + MimeSize uint8 //version2 + Mime []byte "maximum 256 characters" //version2 + + Checksum CRC "CRC32 to check integrity" + Padding []byte "Aligned to 8 bytes" +} + +func NewNeedle(r *http.Request) (n *Needle, fname string, e error) { + + n = new(Needle) + form, fe := r.MultipartReader() + if fe != nil { + fmt.Println("MultipartReader [ERROR]", fe) + e = fe + return + } + part, fe := form.NextPart() + if fe != nil { + fmt.Println("Reading Multi part [ERROR]", fe) + e = fe + return + } + fname = part.FileName() + fname = path.Base(fname) + data, _ := ioutil.ReadAll(part) + dotIndex := strings.LastIndex(fname, ".") + ext, mtype := "", "" + if dotIndex > 0 { + ext = fname[dotIndex:] + mtype = mime.TypeByExtension(ext) + } + contentType := part.Header.Get("Content-Type") + if contentType != "" && mtype != contentType && len(contentType) < 256 { + n.Mime = []byte(contentType) + n.SetHasMime() + mtype = contentType + } + if IsGzippable(ext, mtype) { + if data, e = GzipData(data); e != nil { + return + } + n.SetGzipped() + } + if ext == ".gz" { + n.SetGzipped() + } + if len(fname) < 256 { + if strings.HasSuffix(fname, ".gz") { + n.Name = []byte(fname[:len(fname)-3]) + } else { + n.Name = []byte(fname) + } + n.SetHasName() + } + + n.Data = data + n.Checksum = NewCRC(data) + + commaSep := strings.LastIndex(r.URL.Path, ",") + dotSep := strings.LastIndex(r.URL.Path, ".") + fid := r.URL.Path[commaSep+1:] + if dotSep > 0 { + fid = r.URL.Path[commaSep+1 : dotSep] + } + + n.ParsePath(fid) + + return +} +func (n *Needle) ParsePath(fid string) { + length := len(fid) + if length <= 8 { + if length > 0 { + println("Invalid fid", fid, "length", length) + } + return + } + delta := "" + deltaIndex := strings.LastIndex(fid, "_") + if deltaIndex > 0 { + fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] + } + n.Id, n.Cookie = ParseKeyHash(fid) + if delta != "" { + d, e := strconv.ParseUint(delta, 10, 64) + if e == nil { + n.Id += d + } + } +} + +func ParseKeyHash(key_hash_string string) (uint64, uint32) { + key_hash_bytes, khe := hex.DecodeString(key_hash_string) + key_hash_len := len(key_hash_bytes) + if khe != nil || key_hash_len <= 4 { + println("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe) + return 0, 0 + } + key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) + hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) + return key, hash +} diff --git a/src/weed/storage/needle_map.go b/src/weed/storage/needle_map.go new file mode 100644 index 000000000..505dd36dd --- /dev/null +++ b/src/weed/storage/needle_map.go @@ -0,0 +1,99 @@ +package storage + +import ( + //"log" + "os" + "weed/util" +) + +type NeedleMap struct { + indexFile *os.File + m CompactMap + + //transient + bytes []byte + + deletionCounter int + fileCounter int + deletionByteCounter uint64 + fileByteCounter uint64 +} + +func NewNeedleMap(file *os.File) *NeedleMap { + nm := &NeedleMap{ + m: NewCompactMap(), + bytes: make([]byte, 16), + indexFile: file, + } + return nm +} + +const ( + RowsToRead = 1024 +) + +func LoadNeedleMap(file *os.File) *NeedleMap { + nm := NewNeedleMap(file) + bytes := make([]byte, 16*RowsToRead) + count, e := nm.indexFile.Read(bytes) + for count > 0 && e == nil { + for i := 0; i < count; i += 16 { + key := util.BytesToUint64(bytes[i : i+8]) + offset := util.BytesToUint32(bytes[i+8 : i+12]) + size := util.BytesToUint32(bytes[i+12 : i+16]) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } else { + oldSize := nm.m.Delete(Key(key)) + //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + } + + count, e = nm.indexFile.Read(bytes) + } + return nm +} + +func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { + oldSize := nm.m.Set(Key(key), offset, size) + util.Uint64toBytes(nm.bytes[0:8], key) + util.Uint32toBytes(nm.bytes[8:12], offset) + util.Uint32toBytes(nm.bytes[12:16], size) + nm.fileCounter++ + nm.fileByteCounter = nm.fileByteCounter + uint64(size) + if oldSize > 0 { + nm.deletionCounter++ + nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize) + } + return nm.indexFile.Write(nm.bytes) +} +func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) { + element, ok = nm.m.Get(Key(key)) + return +} +func (nm *NeedleMap) Delete(key uint64) { + nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key))) + util.Uint64toBytes(nm.bytes[0:8], key) + util.Uint32toBytes(nm.bytes[8:12], 0) + util.Uint32toBytes(nm.bytes[12:16], 0) + nm.indexFile.Write(nm.bytes) + nm.deletionCounter++ +} +func (nm *NeedleMap) Close() { + nm.indexFile.Close() +} +func (nm *NeedleMap) ContentSize() uint64 { + return nm.fileByteCounter +} +func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) { + return nm.m.Visit(visit) +} diff --git a/src/weed/storage/needle_read_write.go b/src/weed/storage/needle_read_write.go new file mode 100644 index 000000000..1dac6d5c4 --- /dev/null +++ b/src/weed/storage/needle_read_write.go @@ -0,0 +1,238 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + "weed/util" +) + +const ( + FlagGzip = 0x01 + FlagHasName = 0x02 + FlagHasMime = 0x04 +) + +func (n *Needle) DiskSize() uint32 { + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize +} +func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { + if s, ok := w.(io.Seeker); ok { + if end, e := s.Seek(0, 1); e == nil { + defer func(s io.Seeker, off int64) { + if err != nil { + if _, e = s.Seek(off, 0); e != nil { + fmt.Printf("Failed to seek back to %d with error: %s\n", w, off, e) + } + } + }(s, end) + } else { + err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e) + return + } + } + switch version { + case Version1: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.Size = uint32(len(n.Data)) + size = n.Size + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return + case Version2: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) + if n.DataSize > 0 { + n.Size = 4 + n.DataSize + 1 + if n.HasName() { + n.Size = n.Size + 1 + uint32(n.NameSize) + } + if n.HasMime() { + n.Size = n.Size + 1 + uint32(n.MimeSize) + } + } + size = n.DataSize + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if n.DataSize > 0 { + util.Uint32toBytes(header[0:4], n.DataSize) + if _, err = w.Write(header[0:4]); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + util.Uint8toBytes(header[0:1], n.Flags) + if _, err = w.Write(header[0:1]); err != nil { + return + } + } + if n.HasName() { + util.Uint8toBytes(header[0:1], n.NameSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Name); err != nil { + return + } + } + if n.HasMime() { + util.Uint8toBytes(header[0:1], n.MimeSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Mime); err != nil { + return + } + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return n.DataSize, err + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} + +func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) { + switch version { + case Version1: + bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) + if ret, err = r.Read(bytes); err != nil { + return + } + n.readNeedleHeader(bytes) + n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + if checksum != NewCRC(n.Data).Value() { + return 0, errors.New("CRC error! Data On Disk Corrupted!") + } + return + case Version2: + if size == 0 { + return 0, nil + } + bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) + if ret, err = r.Read(bytes); err != nil { + return + } + if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) { + return 0, errors.New("File Entry Not Found!") + } + n.readNeedleHeader(bytes) + if n.Size != size { + return 0, fmt.Errorf("File Entry Not Found! Needle %d Memory %d", n.Size, size) + } + n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize]) + if checksum != NewCRC(n.Data).Value() { + return 0, errors.New("CRC error! Data On Disk Corrupted!") + } + return + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} +func (n *Needle) readNeedleHeader(bytes []byte) { + n.Cookie = util.BytesToUint32(bytes[0:4]) + n.Id = util.BytesToUint64(bytes[4:12]) + n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) +} +func (n *Needle) readNeedleDataVersion2(bytes []byte) { + index, lenBytes := 0, len(bytes) + if index < lenBytes { + n.DataSize = util.BytesToUint32(bytes[index : index+4]) + index = index + 4 + n.Data = bytes[index : index+int(n.DataSize)] + index = index + int(n.DataSize) + n.Flags = bytes[index] + index = index + 1 + } + if index < lenBytes && n.HasName() { + n.NameSize = uint8(bytes[index]) + index = index + 1 + n.Name = bytes[index : index+int(n.NameSize)] + index = index + int(n.NameSize) + } + if index < lenBytes && n.HasMime() { + n.MimeSize = uint8(bytes[index]) + index = index + 1 + n.Mime = bytes[index : index+int(n.MimeSize)] + } +} + +func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) { + n = new(Needle) + if version == Version1 || version == Version2 { + bytes := make([]byte, NeedleHeaderSize) + var count int + count, err = r.Read(bytes) + if count <= 0 || err != nil { + return nil, 0, err + } + n.readNeedleHeader(bytes) + padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + bodyLength = n.Size + NeedleChecksumSize + padding + } + return +} + +//n should be a needle already read the header +//the input stream will read until next file entry +func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) { + if bodyLength <= 0 { + return nil + } + switch version { + case Version1: + bytes := make([]byte, bodyLength) + if _, err = r.Read(bytes); err != nil { + return + } + n.Data = bytes[:n.Size] + n.Checksum = NewCRC(n.Data) + case Version2: + bytes := make([]byte, bodyLength) + if _, err = r.Read(bytes); err != nil { + return + } + n.readNeedleDataVersion2(bytes[0:n.Size]) + n.Checksum = NewCRC(n.Data) + default: + err = fmt.Errorf("Unsupported Version! (%d)", version) + } + return +} + +func (n *Needle) IsGzipped() bool { + return n.Flags&FlagGzip > 0 +} +func (n *Needle) SetGzipped() { + n.Flags = n.Flags | FlagGzip +} +func (n *Needle) HasName() bool { + return n.Flags&FlagHasName > 0 +} +func (n *Needle) SetHasName() { + n.Flags = n.Flags | FlagHasName +} +func (n *Needle) HasMime() bool { + return n.Flags&FlagHasMime > 0 +} +func (n *Needle) SetHasMime() { + n.Flags = n.Flags | FlagHasMime +} diff --git a/src/weed/storage/replication_type.go b/src/weed/storage/replication_type.go new file mode 100644 index 000000000..0902d1016 --- /dev/null +++ b/src/weed/storage/replication_type.go @@ -0,0 +1,123 @@ +package storage + +import ( + "errors" +) + +type ReplicationType string + +const ( + Copy000 = ReplicationType("000") // single copy + Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center + Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center + Copy100 = ReplicationType("100") // 2 copies, each on different data center + Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center + Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center + LengthRelicationType = 6 + CopyNil = ReplicationType(255) // nil value +) + +func NewReplicationTypeFromString(t string) (ReplicationType, error) { + switch t { + case "000": + return Copy000, nil + case "001": + return Copy001, nil + case "010": + return Copy010, nil + case "100": + return Copy100, nil + case "110": + return Copy110, nil + case "200": + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + t) +} +func NewReplicationTypeFromByte(b byte) (ReplicationType, error) { + switch b { + case byte(000): + return Copy000, nil + case byte(001): + return Copy001, nil + case byte(010): + return Copy010, nil + case byte(100): + return Copy100, nil + case byte(110): + return Copy110, nil + case byte(200): + return Copy200, nil + } + return Copy000, errors.New("Unknown Replication Type:" + string(b)) +} + +func (r *ReplicationType) String() string { + switch *r { + case Copy000: + return "000" + case Copy001: + return "001" + case Copy010: + return "010" + case Copy100: + return "100" + case Copy110: + return "110" + case Copy200: + return "200" + } + return "000" +} +func (r *ReplicationType) Byte() byte { + switch *r { + case Copy000: + return byte(000) + case Copy001: + return byte(001) + case Copy010: + return byte(010) + case Copy100: + return byte(100) + case Copy110: + return byte(110) + case Copy200: + return byte(200) + } + return byte(000) +} + +func (repType ReplicationType) GetReplicationLevelIndex() int { + switch repType { + case Copy000: + return 0 + case Copy001: + return 1 + case Copy010: + return 2 + case Copy100: + return 3 + case Copy110: + return 4 + case Copy200: + return 5 + } + return -1 +} +func (repType ReplicationType) GetCopyCount() int { + switch repType { + case Copy000: + return 1 + case Copy001: + return 2 + case Copy010: + return 2 + case Copy100: + return 2 + case Copy110: + return 3 + case Copy200: + return 3 + } + return 0 +} diff --git a/src/weed/storage/sample.idx b/src/weed/storage/sample.idx Binary files differnew file mode 100644 index 000000000..44918b41d --- /dev/null +++ b/src/weed/storage/sample.idx diff --git a/src/weed/storage/store.go b/src/weed/storage/store.go new file mode 100644 index 000000000..b016a6491 --- /dev/null +++ b/src/weed/storage/store.go @@ -0,0 +1,204 @@ +package storage + +import ( + "encoding/json" + "errors" + "io/ioutil" + "log" + "net/url" + "weed/util" + "strconv" + "strings" +) + +type Store struct { + volumes map[VolumeId]*Volume + dir string + Port int + Ip string + PublicUrl string + MaxVolumeCount int + + masterNode string + connected bool + volumeSizeLimit uint64 //read from the master + +} + +func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) { + s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount} + s.volumes = make(map[VolumeId]*Volume) + s.loadExistingVolumes() + + log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes") + return +} +func (s *Store) AddVolume(volumeListString string, replicationType string) error { + rt, e := NewReplicationTypeFromString(replicationType) + if e != nil { + return e + } + for _, range_string := range strings.Split(volumeListString, ",") { + if strings.Index(range_string, "-") < 0 { + id_string := range_string + id, err := NewVolumeId(id_string) + if err != nil { + return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!") + } + e = s.addVolume(VolumeId(id), rt) + } else { + pair := strings.Split(range_string, "-") + start, start_err := strconv.ParseUint(pair[0], 10, 64) + if start_err != nil { + return errors.New("Volume Start Id" + pair[0] + " is not a valid unsigned integer!") + } + end, end_err := strconv.ParseUint(pair[1], 10, 64) + if end_err != nil { + return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!") + } + for id := start; id <= end; id++ { + if err := s.addVolume(VolumeId(id), rt); err != nil { + e = err + } + } + } + } + return e +} +func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) { + if s.volumes[vid] != nil { + return errors.New("Volume Id " + vid.String() + " already exists!") + } + log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType) + s.volumes[vid], err = NewVolume(s.dir, vid, replicationType) + return err +} + +func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false + } + garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32) + if e != nil { + return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false + } + return nil, garbageThreshold < s.volumes[vid].garbageLevel() +} +func (s *Store) CompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + return s.volumes[vid].compact() +} +func (s *Store) CommitCompactVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + return s.volumes[vid].commitCompact() +} +func (s *Store) loadExistingVolumes() { + if dirs, err := ioutil.ReadDir(s.dir); err == nil { + for _, dir := range dirs { + name := dir.Name() + if !dir.IsDir() && strings.HasSuffix(name, ".dat") { + base := name[:len(name)-len(".dat")] + if vid, err := NewVolumeId(base); err == nil { + if s.volumes[vid] == nil { + if v, e := NewVolume(s.dir, vid, CopyNil); e == nil { + s.volumes[vid] = v + log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size()) + } + } + } + } + } + } +} +func (s *Store) Status() []*VolumeInfo { + var stats []*VolumeInfo + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = + VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + stats = append(stats, s) + } + return stats +} + +type JoinResult struct { + VolumeSizeLimit uint64 +} + +func (s *Store) SetMaster(mserver string) { + s.masterNode = mserver +} +func (s *Store) Join() error { + stats := new([]*VolumeInfo) + for k, v := range s.volumes { + s := new(VolumeInfo) + s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = + VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter + *stats = append(*stats, s) + } + bytes, _ := json.Marshal(stats) + values := make(url.Values) + if !s.connected { + values.Add("init", "true") + } + values.Add("port", strconv.Itoa(s.Port)) + values.Add("ip", s.Ip) + values.Add("publicUrl", s.PublicUrl) + values.Add("volumes", string(bytes)) + values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) + jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) + if err != nil { + return err + } + var ret JoinResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + s.volumeSizeLimit = ret.VolumeSizeLimit + s.connected = true + return nil +} +func (s *Store) Close() { + for _, v := range s.volumes { + v.Close() + } +} +func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) { + if v := s.volumes[i]; v != nil { + size, err = v.write(n) + if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() { + log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit) + s.Join() + } + return + } + log.Println("volume", i, "not found!") + return +} +func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) { + if v := s.volumes[i]; v != nil { + return v.delete(n) + } + return 0, nil +} +func (s *Store) Read(i VolumeId, n *Needle) (int, error) { + if v := s.volumes[i]; v != nil { + return v.read(n) + } + return 0, errors.New("Not Found") +} +func (s *Store) GetVolume(i VolumeId) *Volume { + return s.volumes[i] +} + +func (s *Store) HasVolume(i VolumeId) bool { + _, ok := s.volumes[i] + return ok +} diff --git a/src/weed/storage/volume.go b/src/weed/storage/volume.go new file mode 100644 index 000000000..707c6e6f8 --- /dev/null +++ b/src/weed/storage/volume.go @@ -0,0 +1,274 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + "path" + "sync" +) + +const ( + SuperBlockSize = 8 +) + +type SuperBlock struct { + Version Version + ReplicaType ReplicationType +} + +func (s *SuperBlock) Bytes() []byte { + header := make([]byte, SuperBlockSize) + header[0] = byte(s.Version) + header[1] = s.ReplicaType.Byte() + return header +} + +type Volume struct { + Id VolumeId + dir string + dataFile *os.File + nm *NeedleMap + + SuperBlock + + accessLock sync.Mutex +} + +func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) { + v = &Volume{dir: dirname, Id: id} + v.SuperBlock = SuperBlock{ReplicaType: replicationType} + e = v.load(true) + return +} +func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) { + v = &Volume{dir: dirname, Id: id} + v.SuperBlock = SuperBlock{ReplicaType: CopyNil} + e = v.load(false) + return +} +func (v *Volume) load(alsoLoadIndex bool) error { + var e error + fileName := path.Join(v.dir, v.Id.String()) + v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + if v.ReplicaType == CopyNil { + if e = v.readSuperBlock(); e != nil { + return e + } + } else { + v.maybeWriteSuperBlock() + } + if alsoLoadIndex { + indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if ie != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + v.nm = LoadNeedleMap(indexFile) + } + return nil +} +func (v *Volume) Version() Version { + return v.SuperBlock.Version +} +func (v *Volume) Size() int64 { + v.accessLock.Lock() + defer v.accessLock.Unlock() + stat, e := v.dataFile.Stat() + if e == nil { + return stat.Size() + } + fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error()) + return -1 +} +func (v *Volume) Close() { + v.accessLock.Lock() + defer v.accessLock.Unlock() + v.nm.Close() + v.dataFile.Close() +} +func (v *Volume) maybeWriteSuperBlock() { + stat, e := v.dataFile.Stat() + if e != nil { + fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e) + return + } + if stat.Size() == 0 { + v.SuperBlock.Version = CurrentVersion + v.dataFile.Write(v.SuperBlock.Bytes()) + } +} +func (v *Volume) readSuperBlock() (err error) { + v.dataFile.Seek(0, 0) + header := make([]byte, SuperBlockSize) + if _, e := v.dataFile.Read(header); e != nil { + return fmt.Errorf("cannot read superblock: %s", e) + } + v.SuperBlock, err = ParseSuperBlock(header) + return err +} +func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { + superBlock.Version = Version(header[0]) + if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil { + err = fmt.Errorf("cannot read replica type: %s", err) + } + return +} +func (v *Volume) NeedToReplicate() bool { + return v.ReplicaType.GetCopyCount() > 1 +} + +func (v *Volume) write(n *Needle) (size uint32, err error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + var offset int64 + if offset, err = v.dataFile.Seek(0, 2); err != nil { + return + } + if size, err = n.Append(v.dataFile, v.Version()); err != nil { + return + } + nv, ok := v.nm.Get(n.Id) + if !ok || int64(nv.Offset)*NeedlePaddingSize < offset { + _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size) + } + return +} +func (v *Volume) delete(n *Needle) (uint32, error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + nv, ok := v.nm.Get(n.Id) + //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) + if ok { + v.nm.Delete(n.Id) + v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0) + _, err := n.Append(v.dataFile, v.Version()) + return nv.Size, err + } + return 0, nil +} + +func (v *Volume) read(n *Needle) (int, error) { + v.accessLock.Lock() + defer v.accessLock.Unlock() + nv, ok := v.nm.Get(n.Id) + if ok && nv.Offset > 0 { + v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0) + return n.Read(v.dataFile, nv.Size, v.Version()) + } + return -1, errors.New("Not Found") +} + +func (v *Volume) garbageLevel() float64 { + return float64(v.nm.deletionByteCounter) / float64(v.ContentSize()) +} + +func (v *Volume) compact() error { + v.accessLock.Lock() + defer v.accessLock.Unlock() + + filePath := path.Join(v.dir, v.Id.String()) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx") +} +func (v *Volume) commitCompact() error { + v.accessLock.Lock() + defer v.accessLock.Unlock() + v.dataFile.Close() + var e error + if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil { + return e + } + if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil { + return e + } + if e = v.load(true); e != nil { + return e + } + return nil +} + +func ScanVolumeFile(dirname string, id VolumeId, + visitSuperBlock func(SuperBlock) error, + visitNeedle func(n *Needle, offset uint32) error) (err error) { + var v *Volume + if v, err = LoadVolumeOnly(dirname, id); err != nil { + return + } + if err = visitSuperBlock(v.SuperBlock); err != nil { + return + } + + version := v.Version() + + offset := uint32(SuperBlockSize) + n, rest, e := ReadNeedleHeader(v.dataFile, version) + if e != nil { + err = fmt.Errorf("cannot read needle header: %s", e) + return + } + for n != nil { + if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil { + err = fmt.Errorf("cannot read needle body: %s", err) + return + } + if err = visitNeedle(n, offset); err != nil { + return + } + offset += NeedleHeaderSize + rest + if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header: %s", err) + } + } + + return +} + +func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) { + var ( + dst, idx *os.File + ) + if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil { + return + } + defer dst.Close() + + if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil { + return + } + defer idx.Close() + + nm := NewNeedleMap(idx) + new_offset := uint32(SuperBlockSize) + + err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error { + _, err = dst.Write(superBlock.Bytes()) + return err + }, func(n *Needle, offset uint32) error { + nv, ok := v.nm.Get(n.Id) + //log.Println("file size is", n.Size, "rest", rest) + if ok && nv.Offset*NeedlePaddingSize == offset { + if nv.Size > 0 { + if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, err = n.Append(dst, v.Version()); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + new_offset += n.DiskSize() + //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest) + } + } + return nil + }) + + return +} +func (v *Volume) ContentSize() uint64 { + return v.nm.fileByteCounter +} diff --git a/src/weed/storage/volume_id.go b/src/weed/storage/volume_id.go new file mode 100644 index 000000000..0333c6cf0 --- /dev/null +++ b/src/weed/storage/volume_id.go @@ -0,0 +1,18 @@ +package storage + +import ( + "strconv" +) + +type VolumeId uint32 + +func NewVolumeId(vid string) (VolumeId, error) { + volumeId, err := strconv.ParseUint(vid, 10, 64) + return VolumeId(volumeId), err +} +func (vid *VolumeId) String() string { + return strconv.FormatUint(uint64(*vid), 10) +} +func (vid *VolumeId) Next() VolumeId { + return VolumeId(uint32(*vid) + 1) +} diff --git a/src/weed/storage/volume_info.go b/src/weed/storage/volume_info.go new file mode 100644 index 000000000..e4c5f6ec4 --- /dev/null +++ b/src/weed/storage/volume_info.go @@ -0,0 +1,13 @@ +package storage + +import () + +type VolumeInfo struct { + Id VolumeId + Size uint64 + RepType ReplicationType + Version Version + FileCount int + DeleteCount int + DeletedByteCount uint64 +} diff --git a/src/weed/storage/volume_version.go b/src/weed/storage/volume_version.go new file mode 100644 index 000000000..9702ae904 --- /dev/null +++ b/src/weed/storage/volume_version.go @@ -0,0 +1,11 @@ +package storage + +import () + +type Version uint8 + +const ( + Version1 = Version(1) + Version2 = Version(2) + CurrentVersion = Version2 +) diff --git a/src/weed/topology/configuration.go b/src/weed/topology/configuration.go new file mode 100644 index 000000000..4c8424214 --- /dev/null +++ b/src/weed/topology/configuration.go @@ -0,0 +1,56 @@ +package topology + +import ( + "encoding/xml" +) + +type loc struct { + dcName string + rackName string +} +type rack struct { + Name string `xml:"name,attr"` + Ips []string `xml:"Ip"` +} +type dataCenter struct { + Name string `xml:"name,attr"` + Racks []rack `xml:"Rack"` +} +type topology struct { + DataCenters []dataCenter `xml:"DataCenter"` +} +type Configuration struct { + XMLName xml.Name `xml:"Configuration"` + Topo topology `xml:"Topology"` + ip2location map[string]loc +} + +func NewConfiguration(b []byte) (*Configuration, error) { + c := &Configuration{} + err := xml.Unmarshal(b, c) + c.ip2location = make(map[string]loc) + for _, dc := range c.Topo.DataCenters { + for _, rack := range dc.Racks { + for _, ip := range rack.Ips { + c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name} + } + } + } + return c, err +} + +func (c *Configuration) String() string { + if b, e := xml.MarshalIndent(c, " ", " "); e == nil { + return string(b) + } + return "" +} + +func (c *Configuration) Locate(ip string) (dc string, rack string) { + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName + } + } + return "DefaultDataCenter", "DefaultRack" +} diff --git a/src/weed/topology/configuration_test.go b/src/weed/topology/configuration_test.go new file mode 100644 index 000000000..35d82c058 --- /dev/null +++ b/src/weed/topology/configuration_test.go @@ -0,0 +1,42 @@ +package topology + +import ( + "fmt" + "testing" +) + +func TestLoadConfiguration(t *testing.T) { + + confContent := ` + +<?xml version="1.0" encoding="UTF-8" ?> +<Configuration> + <Topology> + <DataCenter name="dc1"> + <Rack name="rack1"> + <Ip>192.168.1.1</Ip> + </Rack> + </DataCenter> + <DataCenter name="dc2"> + <Rack name="rack1"> + <Ip>192.168.1.2</Ip> + </Rack> + <Rack name="rack2"> + <Ip>192.168.1.3</Ip> + <Ip>192.168.1.4</Ip> + </Rack> + </DataCenter> + </Topology> +</Configuration> +` + c, err := NewConfiguration([]byte(confContent)) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%s", err.Error()) + } + + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { + t.Fatalf("unmarshal error:%s", c) + } +} diff --git a/src/weed/topology/data_center.go b/src/weed/topology/data_center.go new file mode 100644 index 000000000..a3b2b7d13 --- /dev/null +++ b/src/weed/topology/data_center.go @@ -0,0 +1,41 @@ +package topology + +import () + +type DataCenter struct { + NodeImpl +} + +func NewDataCenter(id string) *DataCenter { + dc := &DataCenter{} + dc.id = NodeId(id) + dc.nodeType = "DataCenter" + dc.children = make(map[NodeId]Node) + dc.NodeImpl.value = dc + return dc +} + +func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack +} + +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m +} diff --git a/src/weed/topology/data_node.go b/src/weed/topology/data_node.go new file mode 100644 index 000000000..bea4729e2 --- /dev/null +++ b/src/weed/topology/data_node.go @@ -0,0 +1,60 @@ +package topology + +import ( + _ "fmt" + "weed/storage" + "strconv" +) + +type DataNode struct { + NodeImpl + volumes map[storage.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + Dead bool +} + +func NewDataNode(id string) *DataNode { + s := &DataNode{} + s.id = NodeId(id) + s.nodeType = "DataNode" + s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.NodeImpl.value = s + return s +} +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { + if _, ok := dn.volumes[v.Id]; !ok { + dn.volumes[v.Id] = v + dn.UpAdjustVolumeCountDelta(1) + dn.UpAdjustActiveVolumeCountDelta(1) + dn.UpAdjustMaxVolumeId(v.Id) + } else { + dn.volumes[v.Id] = v + } +} +func (dn *DataNode) GetTopology() *Topology { + p := dn.parent + for p.Parent() != nil { + p = p.Parent() + } + t := p.(*Topology) + return t +} +func (dn *DataNode) MatchLocation(ip string, port int) bool { + return dn.Ip == ip && dn.Port == port +} +func (dn *DataNode) Url() string { + return dn.Ip + ":" + strconv.Itoa(dn.Port) +} + +func (dn *DataNode) ToMap() interface{} { + ret := make(map[string]interface{}) + ret["Url"] = dn.Url() + ret["Volumes"] = dn.GetVolumeCount() + ret["Max"] = dn.GetMaxVolumeCount() + ret["Free"] = dn.FreeSpace() + ret["PublicUrl"] = dn.PublicUrl + return ret +} diff --git a/src/weed/topology/node.go b/src/weed/topology/node.go new file mode 100644 index 000000000..0bc85011c --- /dev/null +++ b/src/weed/topology/node.go @@ -0,0 +1,200 @@ +package topology + +import ( + "fmt" + "weed/storage" +) + +type NodeId string +type Node interface { + Id() NodeId + String() string + FreeSpace() int + ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) + UpAdjustVolumeCountDelta(volumeCountDelta int) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + UpAdjustMaxVolumeId(vid storage.VolumeId) + + GetVolumeCount() int + GetActiveVolumeCount() int + GetMaxVolumeCount() int + GetMaxVolumeId() storage.VolumeId + SetParent(Node) + LinkChildNode(node Node) + UnlinkChildNode(nodeId NodeId) + CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) + + IsDataNode() bool + Children() map[NodeId]Node + Parent() Node + + GetValue() interface{} //get reference to the topology,dc,rack,datanode +} +type NodeImpl struct { + id NodeId + volumeCount int + activeVolumeCount int + maxVolumeCount int + parent Node + children map[NodeId]Node + maxVolumeId storage.VolumeId + + //for rack, data center, topology + nodeType string + value interface{} +} + +func (n *NodeImpl) IsDataNode() bool { + return n.nodeType == "DataNode" +} +func (n *NodeImpl) IsRack() bool { + return n.nodeType == "Rack" +} +func (n *NodeImpl) IsDataCenter() bool { + return n.nodeType == "DataCenter" +} +func (n *NodeImpl) String() string { + if n.parent != nil { + return n.parent.String() + ":" + string(n.id) + } + return string(n.id) +} +func (n *NodeImpl) Id() NodeId { + return n.id +} +func (n *NodeImpl) FreeSpace() int { + return n.maxVolumeCount - n.volumeCount +} +func (n *NodeImpl) SetParent(node Node) { + n.parent = node +} +func (n *NodeImpl) Children() map[NodeId]Node { + return n.children +} +func (n *NodeImpl) Parent() Node { + return n.parent +} +func (n *NodeImpl) GetValue() interface{} { + return n.value +} +func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { + ret := false + var assignedNode *DataNode + for _, node := range n.children { + freeSpace := node.FreeSpace() + //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + if freeSpace <= 0 { + continue + } + if r >= freeSpace { + r -= freeSpace + } else { + if node.IsDataNode() && node.FreeSpace() > 0 { + //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + return true, node.(*DataNode) + } + ret, assignedNode = node.ReserveOneVolume(r, vid) + if ret { + break + } + } + } + return ret, assignedNode +} + +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative + n.maxVolumeCount += maxVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative + n.volumeCount += volumeCountDelta + if n.parent != nil { + n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative + n.activeVolumeCount += activeVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative + if n.maxVolumeId < vid { + n.maxVolumeId = vid + if n.parent != nil { + n.parent.UpAdjustMaxVolumeId(vid) + } + } +} +func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { + return n.maxVolumeId +} +func (n *NodeImpl) GetVolumeCount() int { + return n.volumeCount +} +func (n *NodeImpl) GetActiveVolumeCount() int { + return n.activeVolumeCount +} +func (n *NodeImpl) GetMaxVolumeCount() int { + return n.maxVolumeCount +} + +func (n *NodeImpl) LinkChildNode(node Node) { + if n.children[node.Id()] == nil { + n.children[node.Id()] = node + n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) + n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) + node.SetParent(n) + fmt.Println(n, "adds child", node.Id()) + } +} + +func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { + node := n.children[nodeId] + node.SetParent(nil) + if node != nil { + delete(n.children, node.Id()) + n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) + n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) + } +} + +func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { + if n.IsRack() { + for _, c := range n.Children() { + dn := c.(*DataNode) //can not cast n to DataNode + if dn.LastSeen < freshThreshHold { + if !dn.Dead { + dn.Dead = true + n.GetTopology().chanDeadDataNodes <- dn + } + } + for _, v := range dn.volumes { + if uint64(v.Size) >= volumeSizeLimit { + //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) + n.GetTopology().chanFullVolumes <- v + } + } + } + } else { + for _, c := range n.Children() { + c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) + } + } +} + +func (n *NodeImpl) GetTopology() *Topology { + var p Node + p = n + for p.Parent() != nil { + p = p.Parent() + } + return p.GetValue().(*Topology) +} diff --git a/src/weed/topology/node_list.go b/src/weed/topology/node_list.go new file mode 100644 index 000000000..17ab1e0dc --- /dev/null +++ b/src/weed/topology/node_list.go @@ -0,0 +1,69 @@ +package topology + +import ( + "fmt" + "math/rand" + "weed/storage" +) + +type NodeList struct { + nodes map[NodeId]Node + except map[string]Node +} + +func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList { + m := make(map[NodeId]Node, len(nodes)-len(except)) + for _, n := range nodes { + if except[n.String()] == nil { + m[n.Id()] = n + } + } + nl := &NodeList{nodes: m} + return nl +} + +func (nl *NodeList) FreeSpace() int { + freeSpace := 0 + for _, n := range nl.nodes { + freeSpace += n.FreeSpace() + } + return freeSpace +} + +func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { + var list []Node + for _, n := range nl.nodes { + if n.FreeSpace() >= min { + list = append(list, n) + } + } + if n > len(list) { + return nil, false + } + for i := n; i > 0; i-- { + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t + } + return list[len(list)-n:], true +} + +func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { + for _, node := range nl.nodes { + freeSpace := node.FreeSpace() + if randomVolumeIndex >= freeSpace { + randomVolumeIndex -= freeSpace + } else { + if node.IsDataNode() && node.FreeSpace() > 0 { + fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + return true, node.(*DataNode) + } + children := node.Children() + newNodeList := NewNodeList(children, nl.except) + return newNodeList.ReserveOneVolume(randomVolumeIndex, vid) + } + } + return false, nil + +} diff --git a/src/weed/topology/node_list_test.go b/src/weed/topology/node_list_test.go new file mode 100644 index 000000000..2fb4fa970 --- /dev/null +++ b/src/weed/topology/node_list_test.go @@ -0,0 +1,39 @@ +package topology + +import ( + _ "fmt" + "strconv" + "testing" +) + +func TestXYZ(t *testing.T) { + topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) + for i := 0; i < 5; i++ { + dc := NewDataCenter("dc" + strconv.Itoa(i)) + dc.activeVolumeCount = i + dc.maxVolumeCount = 5 + topo.LinkChildNode(dc) + } + nl := NewNodeList(topo.Children(), nil) + + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked) != 1 { + t.Errorf("need to randomly pick 1 node") + } + + picked, ret = nl.RandomlyPickN(4) + if !ret || len(picked) != 4 { + t.Errorf("need to randomly pick 4 nodes") + } + + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked) != 5 { + t.Errorf("need to randomly pick 5 nodes") + } + + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked) != 0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } + +} diff --git a/src/weed/topology/rack.go b/src/weed/topology/rack.go new file mode 100644 index 000000000..acc34417a --- /dev/null +++ b/src/weed/topology/rack.go @@ -0,0 +1,64 @@ +package topology + +import ( + "strconv" + "time" +) + +type Rack struct { + NodeImpl +} + +func NewRack(id string) *Rack { + r := &Rack{} + r.id = NodeId(id) + r.nodeType = "Rack" + r.children = make(map[NodeId]Node) + r.NodeImpl.value = r + return r +} + +func (r *Rack) FindDataNode(ip string, port int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil +} +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + dn.LastSeen = time.Now().Unix() + if dn.Dead { + dn.Dead = false + r.GetTopology().chanRecoveredDataNodes <- dn + dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) + } + return dn + } + } + dn := NewDataNode(ip + ":" + strconv.Itoa(port)) + dn.Ip = ip + dn.Port = port + dn.PublicUrl = publicUrl + dn.maxVolumeCount = maxVolumeCount + dn.LastSeen = time.Now().Unix() + r.LinkChildNode(dn) + return dn +} + +func (rack *Rack) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = rack.GetMaxVolumeCount() + m["Free"] = rack.FreeSpace() + var dns []interface{} + for _, c := range rack.Children() { + dn := c.(*DataNode) + dns = append(dns, dn.ToMap()) + } + m["DataNodes"] = dns + return m +} diff --git a/src/weed/topology/topo_test.go b/src/weed/topology/topo_test.go new file mode 100644 index 000000000..b77fb8ad8 --- /dev/null +++ b/src/weed/topology/topo_test.go @@ -0,0 +1,127 @@ +package topology + +import ( + "encoding/json" + "fmt" + "math/rand" + "weed/storage" + "testing" + "time" +) + +var topologyLayout = ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":3 + }, + "server2":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":10 + } + }, + "rack2":{ + "server1":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":4 + }, + "server2":{ + "volumes":[], + "limit":4 + }, + "server3":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":2 + } + } + }, + "dc2":{ + }, + "dc3":{ + "rack2":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":3, "size":12312}, + {"id":5, "size":12312} + ], + "limit":4 + } + } + } +} +` + +func setup(topologyLayout string) *Topology { + var data interface{} + err := json.Unmarshal([]byte(topologyLayout), &data) + if err != nil { + fmt.Println("error:", err) + } + + //need to connect all nodes first before server adding volumes + topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) + mTopology := data.(map[string]interface{}) + for dcKey, dcValue := range mTopology { + dc := NewDataCenter(dcKey) + dcMap := dcValue.(map[string]interface{}) + topo.LinkChildNode(dc) + for rackKey, rackValue := range dcMap { + rack := NewRack(rackKey) + rackMap := rackValue.(map[string]interface{}) + dc.LinkChildNode(rack) + for serverKey, serverValue := range rackMap { + server := NewDataNode(serverKey) + serverMap := serverValue.(map[string]interface{}) + rack.LinkChildNode(server) + for _, v := range serverMap["volumes"].([]interface{}) { + m := v.(map[string]interface{}) + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + } + server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + } + } + } + + return topo +} + +func TestRemoveDataCenter(t *testing.T) { + topo := setup(topologyLayout) + topo.UnlinkChildNode(NodeId("dc2")) + if topo.GetActiveVolumeCount() != 15 { + t.Fail() + } + topo.UnlinkChildNode(NodeId("dc3")) + if topo.GetActiveVolumeCount() != 12 { + t.Fail() + } +} + +func TestReserveOneVolume(t *testing.T) { + topo := setup(topologyLayout) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) + ret, node, vid := topo.RandomlyReserveOneVolume() + fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) + +} diff --git a/src/weed/topology/topology.go b/src/weed/topology/topology.go new file mode 100644 index 000000000..d0b12def4 --- /dev/null +++ b/src/weed/topology/topology.go @@ -0,0 +1,148 @@ +package topology + +import ( + "errors" + "io/ioutil" + "math/rand" + "weed/directory" + "weed/sequence" + "weed/storage" +) + +type Topology struct { + NodeImpl + + //transient vid~servers mapping for each replication type + replicaType2VolumeLayout []*VolumeLayout + + pulse int64 + + volumeSizeLimit uint64 + + sequence sequence.Sequencer + + chanDeadDataNodes chan *DataNode + chanRecoveredDataNodes chan *DataNode + chanFullVolumes chan storage.VolumeInfo + + configuration *Configuration +} + +func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology { + t := &Topology{} + t.id = NodeId(id) + t.nodeType = "Topology" + t.NodeImpl.value = t + t.children = make(map[NodeId]Node) + t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + t.pulse = int64(pulse) + t.volumeSizeLimit = volumeSizeLimit + + t.sequence = sequence.NewSequencer(dirname, sequenceFilename) + + t.chanDeadDataNodes = make(chan *DataNode) + t.chanRecoveredDataNodes = make(chan *DataNode) + t.chanFullVolumes = make(chan storage.VolumeInfo) + + t.loadConfiguration(confFile) + + return t +} + +func (t *Topology) loadConfiguration(configurationFile string) error { + b, e := ioutil.ReadFile(configurationFile) + if e == nil { + t.configuration, e = NewConfiguration(b) + } + return e +} + +func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { + for _, vl := range t.replicaType2VolumeLayout { + if vl != nil { + if list := vl.Lookup(vid); list != nil { + return list + } + } + } + return nil +} + +func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { + if t.FreeSpace() <= 0 { + return false, nil, nil + } + vid := t.NextVolumeId() + ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) + return ret, node, &vid +} + +func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) { + freeSpace := t.FreeSpace() + for _, node := range except { + freeSpace -= node.FreeSpace() + } + if freeSpace <= 0 { + return false, nil, nil + } + vid := t.NextVolumeId() + ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) + return ret, node, &vid +} + +func (t *Topology) NextVolumeId() storage.VolumeId { + vid := t.GetMaxVolumeId() + return vid.Next() +} + +func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { + replicationTypeIndex := repType.GetReplicationLevelIndex() + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) + } + vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) + if err != nil { + return "", 0, nil, errors.New("No writable volumes avalable!") + } + fileId, count := t.sequence.NextFileId(count) + return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil +} + +func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { + replicationTypeIndex := repType.GetReplicationLevelIndex() + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) + } + return t.replicaType2VolumeLayout[replicationTypeIndex] +} + +func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { + t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) +} + +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { + dcName, rackName := t.configuration.Locate(ip) + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + dn := rack.FindDataNode(ip, port) + if init && dn != nil { + t.UnRegisterDataNode(dn) + } + dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + for _, v := range volumeInfos { + dn.AddOrUpdateVolume(v) + t.RegisterVolumeLayout(&v, dn) + } +} + +func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { + for _, c := range t.Children() { + dc := c.(*DataCenter) + if string(dc.Id()) == dcName { + return dc + } + } + dc := NewDataCenter(dcName) + t.LinkChildNode(dc) + return dc +} diff --git a/src/weed/topology/topology_compact.go b/src/weed/topology/topology_compact.go new file mode 100644 index 000000000..c2b85fe63 --- /dev/null +++ b/src/weed/topology/topology_compact.go @@ -0,0 +1,150 @@ +package topology + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "weed/storage" + "weed/util" + "time" +) + +func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url()) + if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { + //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e) + ch <- false + } else { + //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) + ch <- ret + } + }(index, dn.Url(), vid) + } + isCheckSuccess := true + for _ = range locationlist.list { + select { + case canVacuum := <-ch: + isCheckSuccess = isCheckSuccess && canVacuum + case <-time.After(30 * time.Minute): + isCheckSuccess = false + break + } + } + return isCheckSuccess +} +func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + vl.removeFromWritable(vid) + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + fmt.Println(index, "Start vacuuming", vid, "on", dn.Url()) + if e := vacuumVolume_Compact(url, vid); e != nil { + fmt.Println(index, "Error when vacuuming", vid, "on", url, e) + ch <- false + } else { + fmt.Println(index, "Complete vacuuming", vid, "on", url) + ch <- true + } + }(index, dn.Url(), vid) + } + isVacuumSuccess := true + for _ = range locationlist.list { + select { + case _ = <-ch: + case <-time.After(30 * time.Minute): + isVacuumSuccess = false + break + } + } + return isVacuumSuccess +} +func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + isCommitSuccess := true + for _, dn := range locationlist.list { + fmt.Println("Start Commiting vacuum", vid, "on", dn.Url()) + if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { + fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e) + isCommitSuccess = false + } else { + fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url()) + } + } + if isCommitSuccess { + vl.setVolumeWritable(vid) + } + return isCommitSuccess +} +func (t *Topology) Vacuum(garbageThreshold string) int { + for _, vl := range t.replicaType2VolumeLayout { + if vl != nil { + for vid, locationlist := range vl.vid2location { + if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(vl, vid, locationlist) { + batchVacuumVolumeCommit(vl, vid, locationlist) + } + } + } + } + } + return 0 +} + +type VacuumVolumeResult struct { + Result bool + Error string +} + +func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("garbageThreshold", garbageThreshold) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) + if err != nil { + fmt.Println("parameters:", values) + return err, false + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err, false + } + if ret.Error != "" { + return errors.New(ret.Error), false + } + return nil, ret.Result +} +func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} +func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/src/weed/topology/topology_event_handling.go b/src/weed/topology/topology_event_handling.go new file mode 100644 index 000000000..6afd82dde --- /dev/null +++ b/src/weed/topology/topology_event_handling.go @@ -0,0 +1,67 @@ +package topology + +import ( + "fmt" + "math/rand" + "weed/storage" + "time" +) + +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { + go func() { + for { + freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func(garbageThreshold string) { + c := time.Tick(15 * time.Minute) + for _ = range c { + t.Vacuum(garbageThreshold) + } + }(garbageThreshold) + go func() { + for { + select { + case v := <-t.chanFullVolumes: + t.SetVolumeCapacityFull(v) + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + fmt.Println("DataNode", dn, "is back alive!") + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + fmt.Println("DataNode", dn, "is dead!") + } + } + }() +} +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { + vl := t.GetVolumeLayout(volumeInfo.RepType) + if !vl.SetVolumeCapacityFull(volumeInfo.Id) { + return false + } + for _, dn := range vl.vid2location[volumeInfo.Id].list { + dn.UpAdjustActiveVolumeCountDelta(-1) + } + return true +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.volumes { + fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) + vl := t.GetVolumeLayout(v.RepType) + vl.SetVolumeUnavailable(dn, v.Id) + } + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) + dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + dn.Parent().UnlinkChildNode(dn.Id()) +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.volumes { + vl := t.GetVolumeLayout(v.RepType) + if vl.isWritable(&v) { + vl.SetVolumeAvailable(dn, v.Id) + } + } +} diff --git a/src/weed/topology/topology_map.go b/src/weed/topology/topology_map.go new file mode 100644 index 000000000..b416ee943 --- /dev/null +++ b/src/weed/topology/topology_map.go @@ -0,0 +1,50 @@ +package topology + +import () + +func (t *Topology) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() + m["Free"] = t.FreeSpace() + var dcs []interface{} + for _, c := range t.Children() { + dc := c.(*DataCenter) + dcs = append(dcs, dc.ToMap()) + } + m["DataCenters"] = dcs + var layouts []interface{} + for _, layout := range t.replicaType2VolumeLayout { + if layout != nil { + layouts = append(layouts, layout.ToMap()) + } + } + m["layouts"] = layouts + return m +} + +func (t *Topology) ToVolumeMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() + m["Free"] = t.FreeSpace() + dcs := make(map[NodeId]interface{}) + for _, c := range t.Children() { + dc := c.(*DataCenter) + racks := make(map[NodeId]interface{}) + for _, r := range dc.Children() { + rack := r.(*Rack) + dataNodes := make(map[NodeId]interface{}) + for _, d := range rack.Children() { + dn := d.(*DataNode) + var volumes []interface{} + for _, v := range dn.volumes { + volumes = append(volumes, v) + } + dataNodes[d.Id()] = volumes + } + racks[r.Id()] = dataNodes + } + dcs[dc.Id()] = racks + } + m["DataCenters"] = dcs + return m +} diff --git a/src/weed/topology/volume_layout.go b/src/weed/topology/volume_layout.go new file mode 100644 index 000000000..c144c1861 --- /dev/null +++ b/src/weed/topology/volume_layout.go @@ -0,0 +1,116 @@ +package topology + +import ( + "errors" + "fmt" + "math/rand" + "weed/storage" +) + +type VolumeLayout struct { + repType storage.ReplicationType + vid2location map[storage.VolumeId]*VolumeLocationList + writables []storage.VolumeId // transient array of writable volume id + pulse int64 + volumeSizeLimit uint64 +} + +func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { + return &VolumeLayout{ + repType: repType, + vid2location: make(map[storage.VolumeId]*VolumeLocationList), + writables: *new([]storage.VolumeId), + pulse: pulse, + volumeSizeLimit: volumeSizeLimit, + } +} + +func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + if _, ok := vl.vid2location[v.Id]; !ok { + vl.vid2location[v.Id] = NewVolumeLocationList() + } + if vl.vid2location[v.Id].Add(dn) { + if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { + if vl.isWritable(v) { + vl.writables = append(vl.writables, v.Id) + } + } + } +} + +func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { + return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion +} + +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { + return vl.vid2location[vid].list +} + +func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { + len_writers := len(vl.writables) + if len_writers <= 0 { + fmt.Println("No more writable volumes!") + return nil, 0, nil, errors.New("No more writable volumes!") + } + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return &vid, count, locationList, nil + } + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") +} + +func (vl *VolumeLayout) GetActiveVolumeCount() int { + return len(vl.writables) +} + +func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { + for i, v := range vl.writables { + if v == vid { + fmt.Println("Volume", vid, "becomes unwritable") + vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) + return true + } + } + return false +} +func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { + for _, v := range vl.writables { + if v == vid { + return false + } + } + fmt.Println("Volume", vid, "becomes writable") + vl.writables = append(vl.writables, vid) + return true +} + +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { + if vl.vid2location[vid].Remove(dn) { + if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { + fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) + return vl.removeFromWritable(vid) + } + } + return false +} +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { + if vl.vid2location[vid].Add(dn) { + if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { + return vl.setVolumeWritable(vid) + } + } + return false +} + +func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { + return vl.removeFromWritable(vid) +} + +func (vl *VolumeLayout) ToMap() interface{} { + m := make(map[string]interface{}) + m["replication"] = vl.repType.String() + m["writables"] = vl.writables + //m["locations"] = vl.vid2location + return m +} diff --git a/src/weed/topology/volume_location.go b/src/weed/topology/volume_location.go new file mode 100644 index 000000000..507a240b5 --- /dev/null +++ b/src/weed/topology/volume_location.go @@ -0,0 +1,58 @@ +package topology + +import () + +type VolumeLocationList struct { + list []*DataNode +} + +func NewVolumeLocationList() *VolumeLocationList { + return &VolumeLocationList{} +} + +func (dnll *VolumeLocationList) Head() *DataNode { + return dnll.list[0] +} + +func (dnll *VolumeLocationList) Length() int { + return len(dnll.list) +} + +func (dnll *VolumeLocationList) Add(loc *DataNode) bool { + for _, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + return false + } + } + dnll.list = append(dnll.list, loc) + return true +} + +func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false +} + +func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { + var changed bool + for _, dnl := range dnll.list { + if dnl.LastSeen < freshThreshHold { + changed = true + break + } + } + if changed { + var l []*DataNode + for _, dnl := range dnll.list { + if dnl.LastSeen >= freshThreshHold { + l = append(l, dnl) + } + } + dnll.list = l + } +} diff --git a/src/weed/upload.go b/src/weed/upload.go new file mode 100644 index 000000000..bc1361cc5 --- /dev/null +++ b/src/weed/upload.go @@ -0,0 +1,113 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "os" + "path" + "weed/operation" + "weed/util" + "strconv" +) + +var uploadReplication *string + +func init() { + cmdUpload.Run = runUpload // break init cycle + cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") + server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location") + uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)") +} + +var cmdUpload = &Command{ + UsageLine: "upload -server=localhost:9333 file1 [file2 file3]", + Short: "upload one or a list of files", + Long: `upload one or a list of files. + It uses consecutive file keys for the list of files. + e.g. If the file1 uses key k, file2 can be read via k_1 + + `, +} + +type AssignResult struct { + Fid string "fid" + Url string "url" + PublicUrl string "publicUrl" + Count int + Error string "error" +} + +func assign(count int) (*AssignResult, error) { + values := make(url.Values) + values.Add("count", strconv.Itoa(count)) + values.Add("replication", *uploadReplication) + jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values) + debug("assign result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret AssignResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + if ret.Count <= 0 { + return nil, errors.New(ret.Error) + } + return &ret, nil +} + +func upload(filename string, server string, fid string) (int, error) { + debug("Start uploading file:", filename) + fh, err := os.Open(filename) + if err != nil { + debug("Failed to open file:", filename) + return 0, err + } + ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh) + if e != nil { + return 0, e + } + return ret.Size, e +} + +type SubmitResult struct { + Fid string "fid" + Size int "size" + Error string "error" +} + +func submit(files []string) []SubmitResult { + ret, err := assign(len(files)) + if err != nil { + fmt.Println(err) + return nil + } + results := make([]SubmitResult, len(files)) + for index, file := range files { + fid := ret.Fid + if index > 0 { + fid = fid + "_" + strconv.Itoa(index) + } + results[index].Size, err = upload(file, ret.PublicUrl, fid) + if err != nil { + fid = "" + results[index].Error = err.Error() + } + results[index].Fid = fid + } + return results +} + +func runUpload(cmd *Command, args []string) bool { + *IsDebug = true + if len(cmdUpload.Flag.Args()) == 0 { + return false + } + results := submit(args) + bytes, _ := json.Marshal(results) + fmt.Print(string(bytes)) + return true +} diff --git a/src/weed/util/bytes.go b/src/weed/util/bytes.go new file mode 100644 index 000000000..6cc3d7018 --- /dev/null +++ b/src/weed/util/bytes.go @@ -0,0 +1,33 @@ +package util + +func BytesToUint64(b []byte) (v uint64) { + length := uint(len(b)) + for i := uint(0); i < length-1; i++ { + v += uint64(b[i]) + v <<= 8 + } + v += uint64(b[length-1]) + return +} +func BytesToUint32(b []byte) (v uint32) { + length := uint(len(b)) + for i := uint(0); i < length-1; i++ { + v += uint32(b[i]) + v <<= 8 + } + v += uint32(b[length-1]) + return +} +func Uint64toBytes(b []byte, v uint64) { + for i := uint(0); i < 8; i++ { + b[7-i] = byte(v >> (i * 8)) + } +} +func Uint32toBytes(b []byte, v uint32) { + for i := uint(0); i < 4; i++ { + b[3-i] = byte(v >> (i * 8)) + } +} +func Uint8toBytes(b []byte, v uint8) { + b[0] = byte(v) +} diff --git a/src/weed/util/config.go b/src/weed/util/config.go new file mode 100644 index 000000000..6ac8a3a65 --- /dev/null +++ b/src/weed/util/config.go @@ -0,0 +1,128 @@ +// Copyright 2011 Numerotron Inc. +// Use of this source code is governed by an MIT-style license +// that can be found in the LICENSE file. +// +// Developed at www.stathat.com by Patrick Crosby +// Contact us on twitter with any questions: twitter.com/stat_hat + +// The jconfig package provides a simple, basic configuration file parser using JSON. +package util + +import ( + "bytes" + "encoding/json" + "log" + "os" +) + +type Config struct { + data map[string]interface{} + filename string +} + +func newConfig() *Config { + result := new(Config) + result.data = make(map[string]interface{}) + return result +} + +// Loads config information from a JSON file +func LoadConfig(filename string) *Config { + result := newConfig() + result.filename = filename + err := result.parse() + if err != nil { + log.Fatalf("error loading config file %s: %s", filename, err) + } + return result +} + +// Loads config information from a JSON string +func LoadConfigString(s string) *Config { + result := newConfig() + err := json.Unmarshal([]byte(s), &result.data) + if err != nil { + log.Fatalf("error parsing config string %s: %s", s, err) + } + return result +} + +func (c *Config) StringMerge(s string) { + next := LoadConfigString(s) + c.merge(next.data) +} + +func (c *Config) LoadMerge(filename string) { + next := LoadConfig(filename) + c.merge(next.data) +} + +func (c *Config) merge(ndata map[string]interface{}) { + for k, v := range ndata { + c.data[k] = v + } +} + +func (c *Config) parse() error { + f, err := os.Open(c.filename) + if err != nil { + return err + } + defer f.Close() + b := new(bytes.Buffer) + _, err = b.ReadFrom(f) + if err != nil { + return err + } + err = json.Unmarshal(b.Bytes(), &c.data) + if err != nil { + return err + } + + return nil +} + +// Returns a string for the config variable key +func (c *Config) GetString(key string) string { + result, present := c.data[key] + if !present { + return "" + } + return result.(string) +} + +// Returns an int for the config variable key +func (c *Config) GetInt(key string) int { + x, ok := c.data[key] + if !ok { + return -1 + } + return int(x.(float64)) +} + +// Returns a float for the config variable key +func (c *Config) GetFloat(key string) float64 { + x, ok := c.data[key] + if !ok { + return -1 + } + return x.(float64) +} + +// Returns a bool for the config variable key +func (c *Config) GetBool(key string) bool { + x, ok := c.data[key] + if !ok { + return false + } + return x.(bool) +} + +// Returns an array for the config variable key +func (c *Config) GetArray(key string) []interface{} { + result, present := c.data[key] + if !present { + return []interface{}(nil) + } + return result.([]interface{}) +} diff --git a/src/weed/util/parse.go b/src/weed/util/parse.go new file mode 100644 index 000000000..930da9522 --- /dev/null +++ b/src/weed/util/parse.go @@ -0,0 +1,16 @@ +package util + +import ( + "strconv" +) + +func ParseInt(text string, defaultValue int) int { + count, parseError := strconv.ParseUint(text, 10, 64) + if parseError != nil { + if len(text) > 0 { + return 0 + } + return defaultValue + } + return int(count) +} diff --git a/src/weed/util/post.go b/src/weed/util/post.go new file mode 100644 index 000000000..6e6ab0003 --- /dev/null +++ b/src/weed/util/post.go @@ -0,0 +1,23 @@ +package util + +import ( + "io/ioutil" + "log" + "net/http" + "net/url" +) + +func Post(url string, values url.Values) ([]byte, error) { + r, err := http.PostForm(url, values) + if err != nil { + log.Println("post to", url, err) + return nil, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Println("read post result from", url, err) + return nil, err + } + return b, nil +} diff --git a/src/weed/version.go b/src/weed/version.go new file mode 100644 index 000000000..b418126a4 --- /dev/null +++ b/src/weed/version.go @@ -0,0 +1,26 @@ +package main + +import ( + "fmt" + "runtime" +) + +const ( + VERSION = "0.28 beta" +) + +var cmdVersion = &Command{ + Run: runVersion, + UsageLine: "version", + Short: "print Weed File System version", + Long: `Version prints the Weed File System version`, +} + +func runVersion(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) + return true +} diff --git a/src/weed/volume.go b/src/weed/volume.go new file mode 100644 index 000000000..483ead145 --- /dev/null +++ b/src/weed/volume.go @@ -0,0 +1,378 @@ +package main + +import ( + "bytes" + "log" + "math/rand" + "mime" + "net/http" + "os" + "weed/operation" + "weed/storage" + "runtime" + "strconv" + "strings" + "time" +) + +func init() { + cmdVolume.Run = runVolume // break init cycle + cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdVolume = &Command{ + UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333", + Short: "start a volume server", + Long: `start a volume server to provide storage spaces + + `, +} + +var ( + vport = cmdVolume.Flag.Int("port", 8080, "http listen port") + volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files") + ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name") + publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>") + masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting") + maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes") + vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + + store *storage.Store +) + +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") + +func statusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = store.Status() + writeJson(w, r, m) +} +func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { + err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } + debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) +} +func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { + err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) + if err == nil { + writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) + } else { + writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + } + debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) +} +func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { + err := store.CompactVolume(r.FormValue("volume")) + if err == nil { + writeJson(w, r, map[string]string{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } + debug("compacted volume =", r.FormValue("volume"), ", error =", err) +} +func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { + err := store.CommitCompactVolume(r.FormValue("volume")) + if err == nil { + writeJson(w, r, map[string]interface{}{"error": ""}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } + debug("commit compact volume =", r.FormValue("volume"), ", error =", err) +} +func storeHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + GetHandler(w, r) + case "DELETE": + DeleteHandler(w, r) + case "POST": + PostHandler(w, r) + } +} +func GetHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, ext := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + n.ParsePath(fid) + + debug("volume", volumeId, "reading", n) + if !store.HasVolume(volumeId) { + lookupResult, err := operation.Lookup(*masterNode, volumeId) + debug("volume", volumeId, "found on", lookupResult, "error", err) + if err == nil { + http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + debug("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + return + } + cookie := n.Cookie + count, e := store.Read(volumeId, n) + debug("read bytes", count, "error", e) + if e != nil || count <= 0 { + debug("read error:", e, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if n.Cookie != cookie { + log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + w.WriteHeader(http.StatusNotFound) + return + } + if n.NameSize > 0 { + fname := string(n.Name) + dotIndex := strings.LastIndex(fname, ".") + if dotIndex > 0 { + ext = fname[dotIndex:] + } + } + mtype := "" + if ext != "" { + mtype = mime.TypeByExtension(ext) + } + if n.MimeSize > 0 { + mtype = string(n.Mime) + } + if mtype != "" { + w.Header().Set("Content-Type", mtype) + } + if n.NameSize > 0 { + w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name))) + } + if ext != ".gz" { + if n.IsGzipped() { + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Set("Content-Encoding", "gzip") + } else { + if n.Data, err = storage.UnGzipData(n.Data); err != nil { + debug("lookup error:", err, r.URL.Path) + } + } + } + } + w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) + w.Write(n.Data) +} +func PostHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, e := storage.NewVolumeId(vid) + if e != nil { + writeJson(w, r, e) + } else { + needle, filename, ne := storage.NewNeedle(r) + if ne != nil { + writeJson(w, r, ne) + } else { + ret, err := store.Write(volumeId, needle) + errorStatus := "" + needToReplicate := !store.HasVolume(volumeId) + if err != nil { + errorStatus = "Failed to write to local disk (" + err.Error() + ")" + } else if ret > 0 { + needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate() + } else { + errorStatus = "Failed to write to local disk" + } + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "standard" { + if !distributedOperation(volumeId, func(location operation.Location) bool { + _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) + return err == nil + }) { + ret = 0 + errorStatus = "Failed to write to replicas for volume " + volumeId.String() + } + } + } + m := make(map[string]interface{}) + if errorStatus == "" { + w.WriteHeader(http.StatusCreated) + } else { + store.Delete(volumeId, needle) + distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) + w.WriteHeader(http.StatusInternalServerError) + m["error"] = errorStatus + } + m["size"] = ret + writeJson(w, r, m) + } + } +} +func DeleteHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, _ := parseURLPath(r.URL.Path) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(fid) + + debug("deleting", n) + + cookie := n.Cookie + count, ok := store.Read(volumeId, n) + + if ok != nil { + m := make(map[string]uint32) + m["size"] = 0 + writeJson(w, r, m) + return + } + + if n.Cookie != cookie { + log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 + ret, err := store.Delete(volumeId, n) + if err != nil { + log.Printf("delete error: %s\n", err) + return + } + + needToReplicate := !store.HasVolume(volumeId) + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "standard" { + if !distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) { + ret = 0 + } + } + } + + if ret != 0 { + w.WriteHeader(http.StatusAccepted) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJson(w, r, m) +} + +func parseURLPath(path string) (vid, fid, ext string) { + + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + if "favicon.ico" != path[sepIndex+1:] { + log.Println("unknown file id", path[sepIndex+1:]) + } + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + return +} + +func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { + if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { + length := 0 + selfUrl := (*ip + ":" + strconv.Itoa(*vport)) + results := make(chan bool) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + length++ + go func(location operation.Location, results chan bool) { + results <- op(location) + }(location, results) + } + } + ret := true + for i := 0; i < length; i++ { + ret = ret && <-results + } + return ret + } else { + log.Println("Failed to lookup for", volumeId, lookupErr.Error()) + } + return false +} + +func runVolume(cmd *Command, args []string) bool { + if *vMaxCpu < 1 { + *vMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*vMaxCpu) + fileInfo, err := os.Stat(*volumeFolder) + if err != nil { + log.Fatalf("No Existing Folder:%s", *volumeFolder) + } + if !fileInfo.IsDir() { + log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder) + } + perm := fileInfo.Mode().Perm() + log.Println("Volume Folder permission:", perm) + + if *publicUrl == "" { + *publicUrl = *ip + ":" + strconv.Itoa(*vport) + } + + store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount) + defer store.Close() + http.HandleFunc("/", storeHandler) + http.HandleFunc("/status", statusHandler) + http.HandleFunc("/admin/assign_volume", assignVolumeHandler) + http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler) + http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler) + http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler) + + go func() { + connected := true + store.SetMaster(*masterNode) + for { + err := store.Join() + if err == nil { + if !connected { + connected = true + log.Println("Reconnected with master") + } + } else { + if connected { + connected = false + } + } + time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + log.Println("store joined at", *masterNode) + + log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) + srv := &http.Server{ + Addr: ":" + strconv.Itoa(*vport), + Handler: http.DefaultServeMux, + ReadTimeout: (time.Duration(*vReadTimeout) * time.Second), + } + e := srv.ListenAndServe() + if e != nil { + log.Fatalf("Fail to start:%s", e.Error()) + } + return true +} diff --git a/src/weed/weed.go b/src/weed/weed.go new file mode 100644 index 000000000..c03cb68ac --- /dev/null +++ b/src/weed/weed.go @@ -0,0 +1,199 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "math/rand" + "net/http" + "os" + "strings" + "sync" + "text/template" + "time" + "unicode" + "unicode/utf8" +) + +var IsDebug *bool +var server *string + +var commands = []*Command{ + cmdFix, + cmdMaster, + cmdUpload, + cmdShell, + cmdVersion, + cmdVolume, + cmdExport, +} + +var exitStatus = 0 +var exitMu sync.Mutex + +func setExitStatus(n int) { + exitMu.Lock() + if exitStatus < n { + exitStatus = n + } + exitMu.Unlock() +} + +func main() { + rand.Seed(time.Now().UnixNano()) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 1 { + usage() + } + + if args[0] == "help" { + help(args[1:]) + for _, cmd := range commands { + if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil { + fmt.Fprintf(os.Stderr, "Default Parameters:\n") + cmd.Flag.PrintDefaults() + } + } + return + } + + for _, cmd := range commands { + if cmd.Name() == args[0] && cmd.Run != nil { + cmd.Flag.Usage = func() { cmd.Usage() } + cmd.Flag.Parse(args[1:]) + args = cmd.Flag.Args() + IsDebug = cmd.IsDebug + if !cmd.Run(cmd, args) { + fmt.Fprintf(os.Stderr, "\n") + cmd.Flag.Usage() + fmt.Fprintf(os.Stderr, "Default Parameters:\n") + cmd.Flag.PrintDefaults() + } + exit() + return + } + } + + fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0]) + setExitStatus(2) + exit() +} + +var usageTemplate = `WeedFS is a software to store billions of files and serve them fast! + +Usage: + + weed command [arguments] + +The commands are: +{{range .}}{{if .Runnable}} + {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}} + +Use "weed help [command]" for more information about a command. + +` + +var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}} +{{end}} + {{.Long}} +` + +// tmpl executes the given template text on data, writing the result to w. +func tmpl(w io.Writer, text string, data interface{}) { + t := template.New("top") + t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize}) + template.Must(t.Parse(text)) + if err := t.Execute(w, data); err != nil { + panic(err) + } +} + +func capitalize(s string) string { + if s == "" { + return s + } + r, n := utf8.DecodeRuneInString(s) + return string(unicode.ToTitle(r)) + s[n:] +} + +func printUsage(w io.Writer) { + tmpl(w, usageTemplate, commands) +} + +func usage() { + printUsage(os.Stderr) + os.Exit(2) +} + +// help implements the 'help' command. +func help(args []string) { + if len(args) == 0 { + printUsage(os.Stdout) + // not exit 2: succeeded at 'weed help'. + return + } + if len(args) != 1 { + fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n") + os.Exit(2) // failed at 'weed help' + } + + arg := args[0] + + for _, cmd := range commands { + if cmd.Name() == arg { + tmpl(os.Stdout, helpTemplate, cmd) + // not exit 2: succeeded at 'weed help cmd'. + return + } + } + + fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg) + os.Exit(2) // failed at 'weed help cmd' +} + +var atexitFuncs []func() + +func atexit(f func()) { + atexitFuncs = append(atexitFuncs, f) +} + +func exit() { + for _, f := range atexitFuncs { + f() + } + os.Exit(exitStatus) +} + +func exitIfErrors() { + if exitStatus != 0 { + exit() + } +} +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { + w.Header().Set("Content-Type", "application/javascript") + var bytes []byte + if r.FormValue("pretty") != "" { + bytes, _ = json.MarshalIndent(obj, "", " ") + } else { + bytes, _ = json.Marshal(obj) + } + callback := r.FormValue("callback") + if callback == "" { + w.Write(bytes) + } else { + w.Write([]uint8(callback)) + w.Write([]uint8("(")) + fmt.Fprint(w, string(bytes)) + w.Write([]uint8(")")) + } +} + +func debug(params ...interface{}) { + if *IsDebug { + fmt.Println(params) + } +} |
