aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-01-15 18:08:19 -0800
committerChris Lu <chris.lu@gmail.com>2013-01-15 18:08:19 -0800
commitbe83a56bb925ab5b6a1f6beba49d82722184d352 (patch)
tree7b838e2d15b76e9a3ab49acfd089928bae6262d2
parent5f278c6bd70575e52c6577796f306c2a37c55f43 (diff)
parent92ffba2ab9561c066ec12379f2288f2a3ea1d9c5 (diff)
downloadseaweedfs-be83a56bb925ab5b6a1f6beba49d82722184d352.tar.xz
seaweedfs-be83a56bb925ab5b6a1f6beba49d82722184d352.zip
Merge remote-tracking branch 'choose_remote_name/cdb'
-rw-r--r--weed-fs/src/cmd/dump/main.go96
-rw-r--r--weed-fs/src/cmd/weed/command.go59
-rw-r--r--weed-fs/src/cmd/weed/fix.go27
-rw-r--r--weed-fs/src/cmd/weed/master.go10
-rw-r--r--weed-fs/src/cmd/weed/shell.go73
-rw-r--r--weed-fs/src/cmd/weed/upload.go2
-rw-r--r--weed-fs/src/cmd/weed/volume.go6
-rw-r--r--weed-fs/src/cmd/weed/weed.go5
-rw-r--r--weed-fs/src/pkg/directory/file_id.go6
-rw-r--r--weed-fs/src/pkg/operation/allocate_volume.go44
-rw-r--r--weed-fs/src/pkg/operation/delete_content.go2
-rw-r--r--weed-fs/src/pkg/operation/lookup_volume_id.go50
-rw-r--r--weed-fs/src/pkg/operation/upload_content.go18
-rw-r--r--weed-fs/src/pkg/replication/volume_growth.go10
-rw-r--r--weed-fs/src/pkg/replication/volume_growth_test.go17
-rw-r--r--weed-fs/src/pkg/sequence/sequence.go52
-rw-r--r--weed-fs/src/pkg/storage/cdb_map.go112
-rw-r--r--weed-fs/src/pkg/storage/compact_map.go24
-rw-r--r--weed-fs/src/pkg/storage/compact_map_perf_test.go60
-rw-r--r--weed-fs/src/pkg/storage/compact_map_test.go38
-rw-r--r--weed-fs/src/pkg/storage/compress.go44
-rw-r--r--weed-fs/src/pkg/storage/needle.go8
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go173
-rw-r--r--weed-fs/src/pkg/storage/needle_read_write.go9
-rw-r--r--weed-fs/src/pkg/storage/replication_type.go202
-rw-r--r--weed-fs/src/pkg/storage/store.go41
-rw-r--r--weed-fs/src/pkg/storage/volume.go91
-rw-r--r--weed-fs/src/pkg/storage/volume_id.go17
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go1
-rw-r--r--weed-fs/src/pkg/storage/volume_version.go9
-rw-r--r--weed-fs/src/pkg/topology/configuration_test.go12
-rw-r--r--weed-fs/src/pkg/topology/data_center.go45
-rw-r--r--weed-fs/src/pkg/topology/node_list.go12
-rw-r--r--weed-fs/src/pkg/topology/node_list_test.go34
-rw-r--r--weed-fs/src/pkg/topology/rack.go14
-rw-r--r--weed-fs/src/pkg/topology/topo_test.go10
-rw-r--r--weed-fs/src/pkg/topology/topology_compact.go4
-rw-r--r--weed-fs/src/pkg/topology/topology_event_handling.go4
-rw-r--r--weed-fs/src/pkg/topology/topology_map.go3
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go43
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go16
-rw-r--r--weed-fs/src/pkg/util/bytes.go53
-rw-r--r--weed-fs/src/pkg/util/file.go63
-rw-r--r--weed-fs/src/pkg/util/parse.go20
-rw-r--r--weed-fs/src/pkg/util/post.go2
45 files changed, 1086 insertions, 555 deletions
diff --git a/weed-fs/src/cmd/dump/main.go b/weed-fs/src/cmd/dump/main.go
new file mode 100644
index 000000000..e3e151eb7
--- /dev/null
+++ b/weed-fs/src/cmd/dump/main.go
@@ -0,0 +1,96 @@
+// Copyright Tamás Gulácsi 2013 All rights reserved
+// Use of this source is governed by the same rules as the weed-fs library.
+// If this would be ambigous, than Apache License 2.0 has to be used.
+//
+// dump dumps the files of a volume to tar or unique files.
+// Each file will have id#mimetype#original_name file format
+
+package main
+
+import (
+ "archive/tar"
+ "bytes"
+ "flag"
+ "fmt"
+ // "io"
+ "log"
+ "os"
+ "pkg/storage"
+ "strings"
+ "time"
+)
+
+var (
+ volumePath = flag.String("dir", "/tmp", "volume directory")
+ volumeId = flag.Int("id", 0, "volume Id")
+ dest = flag.String("out", "-", "output path. Produces tar if path ends with .tar; creates files otherwise.")
+ tarFh *tar.Writer
+ tarHeader tar.Header
+ counter int
+)
+
+func main() {
+ var err error
+
+ flag.Parse()
+
+ if *dest == "-" {
+ *dest = ""
+ }
+ if *dest == "" || strings.HasSuffix(*dest, ".tar") {
+ var fh *os.File
+ if *dest == "" {
+ fh = os.Stdout
+ } else {
+ if fh, err = os.Create(*dest); err != nil {
+ log.Printf("cannot open output tar %s: %s", *dest, err)
+ return
+ }
+ }
+ defer fh.Close()
+ tarFh = tar.NewWriter(fh)
+ defer tarFh.Close()
+ t := time.Now()
+ tarHeader = tar.Header{Mode: 0644,
+ ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
+ Typeflag: tar.TypeReg,
+ AccessTime: t, ChangeTime: t}
+ }
+
+ v, err := storage.NewVolume(*volumePath, storage.VolumeId(*volumeId), storage.CopyNil)
+ if v == nil || v.Version() == 0 || err != nil {
+ log.Printf("cannot load volume %d from %s (%s): %s", *volumeId, *volumePath, v, err)
+ return
+ }
+ log.Printf("volume: %s (ver. %d)", v, v.Version())
+ if err := v.WalkValues(walker); err != nil {
+ log.Printf("error while walking: %s", err)
+ return
+ }
+
+ log.Printf("%d files written.", counter)
+}
+
+func walker(n *storage.Needle) (err error) {
+ // log.Printf("Id=%d Size=%d Name=%s mime=%s", n.Id, n.Size, n.Name, n.Mime)
+ nm := fmt.Sprintf("%d#%s#%s", n.Id, bytes.Replace(n.Mime, []byte{'/'}, []byte{'_'}, -1), n.Name)
+ // log.Print(nm)
+ if tarFh != nil {
+ tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
+ if err = tarFh.WriteHeader(&tarHeader); err != nil {
+ return err
+ }
+ _, err = tarFh.Write(n.Data)
+ } else {
+ if fh, e := os.Create(*dest + "/" + nm); e != nil {
+ return e
+ } else {
+ defer fh.Close()
+ _, err = fh.Write(n.Data)
+ }
+ }
+ if err == nil {
+ counter++
+ }
+ return
+}
diff --git a/weed-fs/src/cmd/weed/command.go b/weed-fs/src/cmd/weed/command.go
index 8c725cafb..4d68ff151 100644
--- a/weed-fs/src/cmd/weed/command.go
+++ b/weed-fs/src/cmd/weed/command.go
@@ -1,53 +1,52 @@
package main
import (
- "flag"
- "fmt"
- "os"
- "strings"
+ "flag"
+ "fmt"
+ "os"
+ "strings"
)
type Command struct {
- // Run runs the command.
- // The args are the arguments after the command name.
- Run func(cmd *Command, args []string) bool
+ // Run runs the command.
+ // The args are the arguments after the command name.
+ Run func(cmd *Command, args []string) bool
- // UsageLine is the one-line usage message.
- // The first word in the line is taken to be the command name.
- UsageLine string
+ // UsageLine is the one-line usage message.
+ // The first word in the line is taken to be the command name.
+ UsageLine string
- // Short is the short description shown in the 'go help' output.
- Short string
+ // Short is the short description shown in the 'go help' output.
+ Short string
- // Long is the long message shown in the 'go help <this-command>' output.
- Long string
-
- // Flag is a set of flags specific to this command.
- Flag flag.FlagSet
+ // Long is the long message shown in the 'go help <this-command>' output.
+ Long string
+ // Flag is a set of flags specific to this command.
+ Flag flag.FlagSet
}
// Name returns the command's name: the first word in the usage line.
func (c *Command) Name() string {
- name := c.UsageLine
- i := strings.Index(name, " ")
- if i >= 0 {
- name = name[:i]
- }
- return name
+ name := c.UsageLine
+ i := strings.Index(name, " ")
+ if i >= 0 {
+ name = name[:i]
+ }
+ return name
}
func (c *Command) Usage() {
- fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
- fmt.Fprintf(os.Stderr, "Default Usage:\n")
- c.Flag.PrintDefaults()
- fmt.Fprintf(os.Stderr, "Description:\n")
- fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
- os.Exit(2)
+ fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
+ fmt.Fprintf(os.Stderr, "Default Usage:\n")
+ c.Flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, "Description:\n")
+ fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
+ os.Exit(2)
}
// Runnable reports whether the command can be run; otherwise
// it is a documentation pseudo-command such as importpath.
func (c *Command) Runnable() bool {
- return c.Run != nil
+ return c.Run != nil
}
diff --git a/weed-fs/src/cmd/weed/fix.go b/weed-fs/src/cmd/weed/fix.go
index 7bed70edd..53b6cfc75 100644
--- a/weed-fs/src/cmd/weed/fix.go
+++ b/weed-fs/src/cmd/weed/fix.go
@@ -1,6 +1,7 @@
package main
import (
+ "errors"
"log"
"os"
"path"
@@ -33,24 +34,36 @@ func runFix(cmd *Command, args []string) bool {
}
fileName := strconv.Itoa(*volumeId)
- dataFile, e := os.OpenFile(path.Join(*dir, fileName+".dat"), os.O_RDONLY, 0644)
+
+ if err := createIndexFile(path.Join(*dir, fileName+".dat")); err != nil {
+ log.Fatalf("[ERROR] " + err.Error())
+ }
+ return true
+}
+
+func createIndexFile(datafn string) error {
+ dataFile, e := os.OpenFile(datafn, os.O_RDONLY, 0644)
if e != nil {
- log.Fatalf("Read Volume [ERROR] %s\n", e)
+ return errors.New("Read Volume " + e.Error())
}
defer dataFile.Close()
- indexFile, ie := os.OpenFile(path.Join(*dir, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
+ // log.Printf("dataFile=%s", dataFile)
+ indexFile, ie := os.OpenFile(datafn[:len(datafn)-4]+".idx", os.O_WRONLY|os.O_CREATE, 0644)
if ie != nil {
- log.Fatalf("Create Volume Index [ERROR] %s\n", ie)
+ return errors.New("Create Volume Index " + ie.Error())
}
defer indexFile.Close()
dataFile.Seek(0, 0)
header := make([]byte, storage.SuperBlockSize)
if _, e := dataFile.Read(header); e != nil {
- log.Fatalf("cannot read superblock: %s", e)
+ return errors.New("cannot read superblock: " + e.Error())
}
- ver, _, _ := storage.ParseSuperBlock(header)
+ ver, _, e := storage.ParseSuperBlock(header)
+ if e != nil {
+ return errors.New("cannot parse superblock: " + e.Error())
+ }
n, rest := storage.ReadNeedleHeader(dataFile, ver)
dataFile.Seek(int64(rest), 1)
@@ -66,5 +79,5 @@ func runFix(cmd *Command, args []string) bool {
n, rest = storage.ReadNeedleHeader(dataFile, ver)
dataFile.Seek(int64(rest), 1)
}
- return true
+ return nil
}
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
index c60974a67..151ae31fc 100644
--- a/weed-fs/src/cmd/weed/master.go
+++ b/weed-fs/src/cmd/weed/master.go
@@ -107,8 +107,14 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
if ip == "" {
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
}
- port, _ := strconv.Atoi(r.FormValue("port"))
- maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ port, err := strconv.Atoi(r.FormValue("port"))
+ if err != nil {
+ log.Printf("ERROR bad port number %s: %s", r.FormValue("port"), err)
+ }
+ maxVolumeCount, err := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ if err != nil {
+ log.Printf("ERROR bad maxVolumeCount %s: %s", r.FormValue("maxVolumeCount"), err)
+ }
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
diff --git a/weed-fs/src/cmd/weed/shell.go b/weed-fs/src/cmd/weed/shell.go
index 78a4b9eb1..daf0b7e1f 100644
--- a/weed-fs/src/cmd/weed/shell.go
+++ b/weed-fs/src/cmd/weed/shell.go
@@ -1,54 +1,53 @@
package main
import (
- "bufio"
- "os"
- "fmt"
+ "bufio"
+ "fmt"
+ "os"
)
func init() {
- cmdShell.Run = runShell // break init cycle
+ cmdShell.Run = runShell // break init cycle
}
var cmdShell = &Command{
- UsageLine: "shell",
- Short: "run interactive commands, now just echo",
- Long: `run interactive commands.
+ UsageLine: "shell",
+ Short: "run interactive commands, now just echo",
+ Long: `run interactive commands.
`,
}
-var (
-)
+var ()
func runShell(command *Command, args []string) bool {
- r := bufio.NewReader(os.Stdin)
- o := bufio.NewWriter(os.Stdout)
- e := bufio.NewWriter(os.Stderr)
- prompt := func () {
- o.WriteString("> ")
- o.Flush()
- };
- readLine := func () string {
- ret, err := r.ReadString('\n')
- if err != nil {
- fmt.Fprint(e,err);
- os.Exit(1)
- }
- return ret
- }
- execCmd := func (cmd string) int {
- if cmd != "" {
- o.WriteString(cmd)
- }
- return 0
- }
+ r := bufio.NewReader(os.Stdin)
+ o := bufio.NewWriter(os.Stdout)
+ e := bufio.NewWriter(os.Stderr)
+ prompt := func() {
+ o.WriteString("> ")
+ o.Flush()
+ }
+ readLine := func() string {
+ ret, err := r.ReadString('\n')
+ if err != nil {
+ fmt.Fprint(e, err)
+ os.Exit(1)
+ }
+ return ret
+ }
+ execCmd := func(cmd string) int {
+ if cmd != "" {
+ o.WriteString(cmd)
+ }
+ return 0
+ }
- cmd := ""
- for {
- prompt()
- cmd = readLine()
- execCmd(cmd)
- }
- return true
+ cmd := ""
+ for {
+ prompt()
+ cmd = readLine()
+ execCmd(cmd)
+ }
+ return true
}
diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go
index e25930b5d..5707fda56 100644
--- a/weed-fs/src/cmd/weed/upload.go
+++ b/weed-fs/src/cmd/weed/upload.go
@@ -67,7 +67,7 @@ func upload(filename string, server string, fid string) (int, error) {
}
ret, e := operation.Upload("http://"+server+"/"+fid, filename, fh)
if e != nil {
- return 0, e
+ return 0, e
}
return ret.Size, e
}
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index 59a5623ea..576096dbb 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -42,7 +42,7 @@ var (
store *storage.Store
)
-var fileNameEscaper = strings.NewReplacer("\\","\\\\","\"","\\\"")
+var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
@@ -156,7 +156,9 @@ func GetHandler(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip")
} else {
- n.Data = storage.UnGzipData(n.Data)
+ if n.Data, err = storage.UnGzipData(n.Data); err != nil {
+ debug("lookup error:", err, r.URL.Path)
+ }
}
}
}
diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go
index 232520e75..685027fb6 100644
--- a/weed-fs/src/cmd/weed/weed.go
+++ b/weed-fs/src/cmd/weed/weed.go
@@ -21,6 +21,7 @@ var server *string
var commands = []*Command{
cmdFix,
+ cmdFreeze,
cmdMaster,
cmdUpload,
cmdShell,
@@ -175,9 +176,9 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
w.Header().Set("Content-Type", "application/javascript")
var bytes []byte
if r.FormValue("pretty") != "" {
- bytes, _ = json.MarshalIndent(obj, "", " ")
+ bytes, _ = json.MarshalIndent(obj, "", " ")
} else {
- bytes, _ = json.Marshal(obj)
+ bytes, _ = json.Marshal(obj)
}
callback := r.FormValue("callback")
if callback == "" {
diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go
index 9ce556580..cd4204f32 100644
--- a/weed-fs/src/pkg/directory/file_id.go
+++ b/weed-fs/src/pkg/directory/file_id.go
@@ -3,8 +3,8 @@ package directory
import (
"encoding/hex"
"pkg/storage"
- "strings"
"pkg/util"
+ "strings"
)
type FileId struct {
@@ -16,14 +16,14 @@ type FileId struct {
func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId {
return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
}
-func ParseFileId(fid string) *FileId{
+func ParseFileId(fid string) *FileId {
a := strings.Split(fid, ",")
if len(a) != 2 {
println("Invalid fid", fid, ", split length", len(a))
return nil
}
vid_string, key_hash_string := a[0], a[1]
- volumeId, _ := storage.NewVolumeId(vid_string)
+ volumeId, _ := storage.NewVolumeId(vid_string)
key, hash := storage.ParseKeyHash(key_hash_string)
return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}
}
diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go
index 6a3512896..c93ccfb62 100644
--- a/weed-fs/src/pkg/operation/allocate_volume.go
+++ b/weed-fs/src/pkg/operation/allocate_volume.go
@@ -1,32 +1,32 @@
package operation
import (
- "encoding/json"
- "errors"
- "net/url"
- "pkg/storage"
- "pkg/topology"
- "pkg/util"
+ "encoding/json"
+ "errors"
+ "net/url"
+ "pkg/storage"
+ "pkg/topology"
+ "pkg/util"
)
type AllocateVolumeResult struct {
- Error string
+ Error string
}
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("replicationType", repType.String())
- jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
- if err != nil {
- return err
- }
- var ret AllocateVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("replicationType", repType.String())
+ jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
+ if err != nil {
+ return err
+ }
+ var ret AllocateVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
}
diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go
index aeab9c3ac..2bdb49651 100644
--- a/weed-fs/src/pkg/operation/delete_content.go
+++ b/weed-fs/src/pkg/operation/delete_content.go
@@ -1,8 +1,8 @@
package operation
import (
- "net/http"
"log"
+ "net/http"
)
func Delete(url string) error {
diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go
index c46c6670e..50a6d91e6 100644
--- a/weed-fs/src/pkg/operation/lookup_volume_id.go
+++ b/weed-fs/src/pkg/operation/lookup_volume_id.go
@@ -1,38 +1,38 @@
package operation
import (
- "encoding/json"
- "net/url"
- "pkg/storage"
- "pkg/util"
- _ "fmt"
- "errors"
+ "encoding/json"
+ "errors"
+ _ "fmt"
+ "net/url"
+ "pkg/storage"
+ "pkg/util"
)
type Location struct {
- Url string "url"
- PublicUrl string "publicUrl"
+ Url string "url"
+ PublicUrl string "publicUrl"
}
type LookupResult struct {
- Locations []Location "locations"
- Error string "error"
+ Locations []Location "locations"
+ Error string "error"
}
//TODO: Add a caching for vid here
func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
- values := make(url.Values)
- values.Add("volumeId", vid.String())
- jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
- if err != nil {
- return nil, err
- }
- var ret LookupResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, err
- }
- if ret.Error != ""{
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
+ values := make(url.Values)
+ values.Add("volumeId", vid.String())
+ jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
+ if err != nil {
+ return nil, err
+ }
+ var ret LookupResult
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, err
+ }
+ if ret.Error != "" {
+ return nil, errors.New(ret.Error)
+ }
+ return &ret, nil
}
diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go
index 7ed74e02f..0bdb697da 100644
--- a/weed-fs/src/pkg/operation/upload_content.go
+++ b/weed-fs/src/pkg/operation/upload_content.go
@@ -3,18 +3,18 @@ package operation
import (
"bytes"
"encoding/json"
+ "errors"
_ "fmt"
"io"
"io/ioutil"
- "log"
+ "log"
"mime/multipart"
"net/http"
- "errors"
)
type UploadResult struct {
- Size int
- Error string
+ Size int
+ Error string
}
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
@@ -26,7 +26,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil {
- log.Println("failing to upload to", uploadUrl)
+ log.Println("failing to upload to", uploadUrl)
return nil, err
}
defer resp.Body.Close()
@@ -37,11 +37,11 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
var ret UploadResult
err = json.Unmarshal(resp_body, &ret)
if err != nil {
- log.Println("failing to read upload resonse", uploadUrl, resp_body)
- return nil, err
+ log.Println("failing to read upload resonse", uploadUrl, resp_body)
+ return nil, err
}
- if ret.Error != ""{
- return nil, errors.New(ret.Error)
+ if ret.Error != "" {
+ return nil, errors.New(ret.Error)
}
return &ret, nil
}
diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go
index 7cabf626e..ce0094a7c 100644
--- a/weed-fs/src/pkg/replication/volume_growth.go
+++ b/weed-fs/src/pkg/replication/volume_growth.go
@@ -7,7 +7,7 @@ import (
"pkg/operation"
"pkg/storage"
"pkg/topology"
- "sync"
+ "sync"
)
/*
@@ -24,7 +24,7 @@ type VolumeGrowth struct {
copy3factor int
copyAll int
- accessLock sync.Mutex
+ accessLock sync.Mutex
}
func NewDefaultVolumeGrowth() *VolumeGrowth {
@@ -49,8 +49,8 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
return 0, errors.New("Unknown Replication Type!")
}
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
- vg.accessLock.Lock()
- defer vg.accessLock.Unlock()
+ vg.accessLock.Lock()
+ defer vg.accessLock.Unlock()
counter = 0
switch repType {
@@ -182,7 +182,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers {
if err := operation.AllocateVolume(server, vid, repType); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version:storage.CurrentVersion}
+ vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server)
fmt.Println("Created Volume", vid, "on", server)
diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go
index 51e47c193..659564c64 100644
--- a/weed-fs/src/pkg/replication/volume_growth_test.go
+++ b/weed-fs/src/pkg/replication/volume_growth_test.go
@@ -5,7 +5,7 @@ import (
"fmt"
"math/rand"
"pkg/storage"
- "pkg/topology"
+ "pkg/topology"
"testing"
"time"
)
@@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5)
+ topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := topology.NewDataCenter(dcKey)
@@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology {
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
+ vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
@@ -121,10 +121,9 @@ func TestRemoveDataCenter(t *testing.T) {
func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4}
- if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{
- t.Log("reserved", c)
- }
+ rand.Seed(time.Now().UnixNano())
+ vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
+ if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
+ t.Log("reserved", c)
+ }
}
-
diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go
index bfdf1b368..c85289468 100644
--- a/weed-fs/src/pkg/sequence/sequence.go
+++ b/weed-fs/src/pkg/sequence/sequence.go
@@ -1,11 +1,11 @@
package sequence
import (
- "encoding/gob"
- "os"
- "path"
- "sync"
+ "encoding/gob"
"log"
+ "os"
+ "path"
+ "sync"
)
const (
@@ -27,21 +27,21 @@ type SequencerImpl struct {
}
func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
- m = &SequencerImpl{dir: dirname, fileName: filename}
+ m = &SequencerImpl{dir: dirname, fileName: filename}
- seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
- if se != nil {
- m.FileIdSequence = FileIdSaveInterval
- log.Println("Setting file id sequence", m.FileIdSequence)
- } else {
- decoder := gob.NewDecoder(seqFile)
- defer seqFile.Close()
- decoder.Decode(&m.FileIdSequence)
- log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
- //in case the server stops between intervals
- m.FileIdSequence += FileIdSaveInterval
- }
- return
+ seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
+ if se != nil {
+ m.FileIdSequence = FileIdSaveInterval
+ log.Println("Setting file id sequence", m.FileIdSequence)
+ } else {
+ decoder := gob.NewDecoder(seqFile)
+ defer seqFile.Close()
+ decoder.Decode(&m.FileIdSequence)
+ log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
+ //in case the server stops between intervals
+ m.FileIdSequence += FileIdSaveInterval
+ }
+ return
}
//count should be 1 or more
@@ -60,12 +60,12 @@ func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
return m.FileIdSequence - m.fileIdCounter, count
}
func (m *SequencerImpl) saveSequence() {
- log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
- seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
- if e != nil {
- log.Fatalf("Sequence File Save [ERROR] %s\n", e)
- }
- defer seqFile.Close()
- encoder := gob.NewEncoder(seqFile)
- encoder.Encode(m.FileIdSequence)
+ log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
+ seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
+ if e != nil {
+ log.Fatalf("Sequence File Save [ERROR] %s\n", e)
+ }
+ defer seqFile.Close()
+ encoder := gob.NewEncoder(seqFile)
+ encoder.Encode(m.FileIdSequence)
}
diff --git a/weed-fs/src/pkg/storage/cdb_map.go b/weed-fs/src/pkg/storage/cdb_map.go
new file mode 100644
index 000000000..bffb2b9ea
--- /dev/null
+++ b/weed-fs/src/pkg/storage/cdb_map.go
@@ -0,0 +1,112 @@
+package storage
+
+import (
+ "github.com/tgulacsi/go-cdb"
+ "io"
+ "log"
+ "os"
+ "pkg/util"
+ "strings"
+)
+
+type CdbMap struct {
+ db *cdb.Cdb
+ transient []byte
+ Filename string
+}
+
+// Opens the CDB file and servers as a needle map
+func NewCdbMap(filename string) (*CdbMap, error) {
+ m, err := cdb.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ return &CdbMap{db: m, transient: make([]byte, 8),
+ Filename: filename}, nil
+}
+
+// writes the content of the index file to a CDB and returns that
+func NewCdbMapFromIndex(indexFile *os.File) (*CdbMap, error) {
+ nm := indexFile.Name()
+ nm = nm[:strings.LastIndex(nm, ".")+1] + "cdb"
+
+ var (
+ key uint64
+ offset uint32
+ ok bool
+ )
+ deleted := make(map[uint64]bool, 16)
+ gatherDeletes := func(buf []byte) error {
+ key = util.BytesToUint64(buf[:8])
+ offset = util.BytesToUint32(buf[8:12])
+ if offset > 0 {
+ if _, ok = deleted[key]; ok { //undelete
+ delete(deleted, key)
+ }
+ } else {
+ deleted[key] = true
+ }
+ return nil
+ }
+ if err := readIndexFile(indexFile, gatherDeletes); err != nil {
+ return nil, err
+ }
+
+ log.Printf("deleted: %s\nnm=%s", deleted, nm)
+ w, err := cdb.NewWriter(nm)
+ if err != nil {
+ return nil, err
+ }
+ iterFun := func(buf []byte) error {
+ key = util.BytesToUint64(buf[:8])
+ log.Printf("iter key=%d", key)
+ if _, ok = deleted[key]; !ok {
+ w.PutPair(buf[:8], buf[8:16])
+ }
+ return nil
+ }
+ indexFile.Seek(0, 0)
+ err = readIndexFile(indexFile, iterFun)
+ w.Close()
+ if err != nil {
+ return nil, err
+ }
+ if err = util.SetFilePerm(nil, nm, 0444, -1); err != nil {
+ return nil, err
+ }
+
+ return NewCdbMap(nm)
+}
+
+func (m *CdbMap) Get(key Key) (element *NeedleValue, ok bool) {
+ util.Uint64toBytes(m.transient, uint64(key))
+ data, err := m.db.Data(m.transient)
+ if err != nil {
+ if err == io.EOF {
+ return nil, false
+ }
+ log.Printf("error getting %s: %s", key, err)
+ return nil, false
+ }
+ return &NeedleValue{Key: key,
+ Offset: util.BytesToUint32(data[:4]),
+ Size: util.BytesToUint32(data[4:8]),
+ }, true
+}
+
+func (m *CdbMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
+ r, err := os.Open(m.Filename)
+ if err != nil {
+ return err
+ }
+ defer r.Close()
+
+ iterFunc := func(elt cdb.Element) error {
+ return pedestrian(&NeedleValue{
+ Key: Key(util.BytesToUint64(elt.Key[:8])),
+ Offset: util.BytesToUint32(elt.Data[:4]),
+ Size: util.BytesToUint32(elt.Data[4:8]),
+ })
+ }
+ return cdb.DumpMap(r, iterFunc)
+}
diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go
index 7365022ea..61cc2c841 100644
--- a/weed-fs/src/pkg/storage/compact_map.go
+++ b/weed-fs/src/pkg/storage/compact_map.go
@@ -109,8 +109,8 @@ type CompactMap struct {
list []CompactSection
}
-func NewCompactMap() CompactMap {
- return CompactMap{}
+func NewCompactMap() *CompactMap {
+ return &CompactMap{}
}
func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
@@ -175,3 +175,23 @@ func (cm *CompactMap) Peek() {
}
}
}
+
+// iterate over the keys by calling iterate on each key till error is returned
+func (cm *CompactMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
+ var i int
+ for _, cs := range cm.list {
+ for key := cs.start; key < cs.end; key++ {
+ if i = cs.binarySearchValues(key); i >= 0 {
+ if err = pedestrian(&cs.values[i]); err != nil {
+ return
+ }
+ }
+ }
+ for _, val := range cs.overflow {
+ if err = pedestrian(val); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go
index 2e2227279..cfa521fc8 100644
--- a/weed-fs/src/pkg/storage/compact_map_perf_test.go
+++ b/weed-fs/src/pkg/storage/compact_map_perf_test.go
@@ -1,43 +1,43 @@
package storage
import (
+ "log"
+ "os"
+ "pkg/util"
"testing"
- "log"
- "os"
- "pkg/util"
)
func TestMemoryUsage(t *testing.T) {
- indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
- if ie != nil {
- log.Fatalln(ie)
- }
- LoadNewNeedleMap(indexFile)
-
+ indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
+ if ie != nil {
+ log.Fatalln(ie)
+ }
+ LoadNewNeedleMap(indexFile)
+
}
func LoadNewNeedleMap(file *os.File) CompactMap {
- m := NewCompactMap()
- bytes := make([]byte, 16*1024)
- count, e := file.Read(bytes)
- if count > 0 {
- fstat, _ := file.Stat()
- log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
- }
- for count > 0 && e == nil {
- for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- if offset > 0 {
- m.Set(Key(key), offset, size)
- } else {
- //delete(m, key)
- }
- }
+ m := NewCompactMap()
+ bytes := make([]byte, 16*1024)
+ count, e := file.Read(bytes)
+ if count > 0 {
+ fstat, _ := file.Stat()
+ log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
+ }
+ for count > 0 && e == nil {
+ for i := 0; i < count; i += 16 {
+ key := util.BytesToUint64(bytes[i : i+8])
+ offset := util.BytesToUint32(bytes[i+8 : i+12])
+ size := util.BytesToUint32(bytes[i+12 : i+16])
+ if offset > 0 {
+ m.Set(Key(key), offset, size)
+ } else {
+ //delete(m, key)
+ }
+ }
- count, e = file.Read(bytes)
- }
- return m
+ count, e = file.Read(bytes)
+ }
+ return m
}
diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go
index c05515b29..e76e9578d 100644
--- a/weed-fs/src/pkg/storage/compact_map_test.go
+++ b/weed-fs/src/pkg/storage/compact_map_test.go
@@ -18,42 +18,42 @@ func TestXYZ(t *testing.T) {
m.Set(Key(i), i+11, i+5)
}
-// for i := uint32(0); i < 100; i++ {
-// if v := m.Get(Key(i)); v != nil {
-// println(i, "=", v.Key, v.Offset, v.Size)
-// }
-// }
-
+ // for i := uint32(0); i < 100; i++ {
+ // if v := m.Get(Key(i)); v != nil {
+ // println(i, "=", v.Key, v.Offset, v.Size)
+ // }
+ // }
+
for i := uint32(0); i < 10*batch; i++ {
- v, ok := m.Get(Key(i))
+ v, ok := m.Get(Key(i))
if i%3 == 0 {
- if !ok {
- t.Fatal("key", i, "missing!")
- }
+ if !ok {
+ t.Fatal("key", i, "missing!")
+ }
if v.Size != i+5 {
t.Fatal("key", i, "size", v.Size)
}
} else if i%37 == 0 {
- if ok && v.Size > 0 {
+ if ok && v.Size > 0 {
t.Fatal("key", i, "should have been deleted needle value", v)
}
} else if i%2 == 0 {
- if v.Size != i {
+ if v.Size != i {
t.Fatal("key", i, "size", v.Size)
}
}
}
for i := uint32(10 * batch); i < 100*batch; i++ {
- v, ok := m.Get(Key(i))
+ v, ok := m.Get(Key(i))
if i%37 == 0 {
- if ok && v.Size > 0 {
- t.Fatal("key", i, "should have been deleted needle value", v)
- }
+ if ok && v.Size > 0 {
+ t.Fatal("key", i, "should have been deleted needle value", v)
+ }
} else if i%2 == 0 {
- if v==nil{
- t.Fatal("key", i, "missing")
- }
+ if v == nil {
+ t.Fatal("key", i, "missing")
+ }
if v.Size != i {
t.Fatal("key", i, "size", v.Size)
}
diff --git a/weed-fs/src/pkg/storage/compress.go b/weed-fs/src/pkg/storage/compress.go
index 9df85b4da..35de70600 100644
--- a/weed-fs/src/pkg/storage/compress.go
+++ b/weed-fs/src/pkg/storage/compress.go
@@ -10,54 +10,40 @@ import (
/*
* Default more not to gzip since gzip can be done on client side.
-*/
+ */
func IsGzippable(ext, mtype string) bool {
- if strings.HasPrefix(mtype, "text/"){
- return true
- }
- if ext == ".zip" {
- return false
- }
- if ext == ".rar" {
- return false
- }
- if ext == ".gz" {
- return false
- }
- if ext == ".pdf" {
+ if strings.HasPrefix(mtype, "text/") {
return true
}
- if ext == ".css" {
- return true
- }
- if ext == ".js" {
+ switch ext {
+ case ".zip", ".rar", ".gz", ".bz2", ".xz":
+ return false
+ case ".pdf", ".txt", ".html", ".css", ".js", ".json":
return true
}
- if ext == ".json" {
- return true
- }
if strings.HasPrefix(mtype, "application/") {
- if strings.HasSuffix(mtype, "xml") {
- return true
- }
- if strings.HasSuffix(mtype, "script") {
+ if strings.HasSuffix(mtype, "xml") ||
+ strings.HasSuffix(mtype, "script") {
return true
}
}
return false
}
-func GzipData(input []byte) []byte {
+
+func GzipData(input []byte) ([]byte, error) {
buf := new(bytes.Buffer)
w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
if _, err := w.Write(input); err != nil {
println("error compressing data:", err)
+ return nil, err
}
if err := w.Close(); err != nil {
println("error closing compressed data:", err)
+ return nil, err
}
- return buf.Bytes()
+ return buf.Bytes(), nil
}
-func UnGzipData(input []byte) []byte {
+func UnGzipData(input []byte) ([]byte, error) {
buf := bytes.NewBuffer(input)
r, _ := gzip.NewReader(buf)
defer r.Close()
@@ -65,5 +51,5 @@ func UnGzipData(input []byte) []byte {
if err != nil {
println("error uncompressing data:", err)
}
- return output
+ return output, err
}
diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go
index 867852362..1f778c7ff 100644
--- a/weed-fs/src/pkg/storage/needle.go
+++ b/weed-fs/src/pkg/storage/needle.go
@@ -12,8 +12,8 @@ import (
)
const (
- NeedleHeaderSize = 16 //should never change this
- NeedlePaddingSize = 8
+ NeedleHeaderSize = 16 //should never change this
+ NeedlePaddingSize = 8
NeedleChecksumSize = 4
)
@@ -64,7 +64,9 @@ func NewNeedle(r *http.Request) (n *Needle, fname string, e error) {
mtype = contentType
}
if IsGzippable(ext, mtype) {
- data = GzipData(data)
+ if data, e = GzipData(data); e != nil {
+ return
+ }
n.SetGzipped()
}
if ext == ".gz" {
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
index e01c27630..b173eb47f 100644
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ b/weed-fs/src/pkg/storage/needle_map.go
@@ -1,14 +1,18 @@
package storage
import (
+ "errors"
+ "io"
"log"
"os"
"pkg/util"
+ "strings"
)
type NeedleMap struct {
indexFile *os.File
- m CompactMap
+ m MapGetSetter // modifiable map
+ fm MapGetter // frozen map
//transient
bytes []byte
@@ -19,55 +23,148 @@ type NeedleMap struct {
fileByteCounter uint64
}
+// Map interface for frozen maps
+type MapGetter interface {
+ Get(key Key) (element *NeedleValue, ok bool)
+ Walk(pedestrian func(*NeedleValue) error) error
+}
+
+// Modifiable map interface
+type MapSetter interface {
+ Set(key Key, offset, size uint32) (oldsize uint32)
+ Delete(key Key) uint32
+}
+
+// Settable and gettable map
+type MapGetSetter interface {
+ MapGetter
+ MapSetter
+}
+
+// New in-memory needle map, backed by "file" index file
func NewNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
+ return &NeedleMap{
m: NewCompactMap(),
bytes: make([]byte, 16),
indexFile: file,
}
- return nm
+}
+
+// Nes frozen (on-disk, not modifiable(!)) needle map
+func NewFrozenNeedleMap(fileName string) (*NeedleMap, error) {
+ if strings.HasSuffix(fileName, ".dat") {
+ fileName = fileName[:4]
+ }
+ var (
+ fm *CdbMap
+ indexExists bool
+ )
+ file, err := os.Open(fileName + ".idx")
+ if err != nil && os.IsNotExist(err) {
+ if fm, err = NewCdbMap(fileName + ".cdb"); err != nil {
+ log.Printf("error opening %s.cdb: %s", fileName, err)
+ fm = nil
+ } else {
+ if dstat, e := os.Stat(fileName + ".dat"); e == nil {
+ if cstat, e := os.Stat(fileName + ".cdb"); e == nil {
+ if cstat.ModTime().Before(dstat.ModTime()) {
+ return nil, errors.New("CDB file " + fileName +
+ ".cdb is older than data file " + fileName + ".dat!")
+ }
+ }
+ }
+ }
+ } else {
+ indexExists = true
+ }
+ if fm == nil {
+ fm, err = NewCdbMapFromIndex(file)
+ if err != nil {
+ return nil, err
+ }
+ if indexExists {
+ os.Remove(fileName + ".idx")
+ }
+ }
+ return &NeedleMap{
+ fm: fm,
+ bytes: make([]byte, 16),
+ }, nil
+}
+
+func (nm NeedleMap) IsFrozen() bool {
+ return nm.m == nil && nm.fm != nil
}
const (
RowsToRead = 1024
)
-func LoadNeedleMap(file *os.File) *NeedleMap {
+var MapIsFrozen = errors.New("Map is frozen!")
+
+func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
- bytes := make([]byte, 16*RowsToRead)
- count, e := nm.indexFile.Read(bytes)
- if count > 0 {
- fstat, _ := file.Stat()
- log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
+
+ var (
+ key uint64
+ offset, size, oldSize uint32
+ )
+ iterFun := func(buf []byte) error {
+ key = util.BytesToUint64(buf[:8])
+ offset = util.BytesToUint32(buf[8:12])
+ size = util.BytesToUint32(buf[12:16])
+ nm.fileCounter++
+ nm.fileByteCounter = nm.fileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize = nm.m.Set(Key(key), offset, size)
+ //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
+ }
+ } else {
+ nm.m.Delete(Key(key))
+ //log.Println("removing key", key)
+ nm.deletionCounter++
+ nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
+ }
+
+ return nil
+ }
+ if err := readIndexFile(file, iterFun); err != nil {
+ return nil, err
+ }
+ return nm, nil
+}
+
+// calls iterFun with each row (raw 16 bytes)
+func readIndexFile(indexFile *os.File, iterFun func([]byte) error) error {
+ buf := make([]byte, 16*RowsToRead)
+ count, e := io.ReadAtLeast(indexFile, buf, 16)
+ if e != nil && count > 0 {
+ fstat, err := indexFile.Stat()
+ if err != nil {
+ log.Println("ERROR stating %s: %s", indexFile, err)
+ } else {
+ log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
+ }
}
for count > 0 && e == nil {
for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- nm.fileCounter++
- nm.fileByteCounter = nm.fileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
- }
- } else {
- nm.m.Delete(Key(key))
- //log.Println("removing key", key)
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(size)
+ if e = iterFun(buf[i : i+16]); e != nil {
+ return e
}
}
- count, e = nm.indexFile.Read(bytes)
+ count, e = io.ReadAtLeast(indexFile, buf, 16)
}
- return nm
+ return nil
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
+ if nm.IsFrozen() {
+ return 0, MapIsFrozen
+ }
oldSize := nm.m.Set(Key(key), offset, size)
util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], offset)
@@ -81,16 +178,24 @@ func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
return nm.indexFile.Write(nm.bytes)
}
func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- element, ok = nm.m.Get(Key(key))
+ if nm.m != nil {
+ element, ok = nm.m.Get(Key(key))
+ } else {
+ element, ok = nm.fm.Get(Key(key))
+ }
return
}
-func (nm *NeedleMap) Delete(key uint64) {
+func (nm *NeedleMap) Delete(key uint64) error {
+ if nm.IsFrozen() {
+ return MapIsFrozen
+ }
nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
util.Uint64toBytes(nm.bytes[0:8], key)
util.Uint32toBytes(nm.bytes[8:12], 0)
util.Uint32toBytes(nm.bytes[12:16], 0)
nm.indexFile.Write(nm.bytes)
nm.deletionCounter++
+ return nil
}
func (nm *NeedleMap) Close() {
nm.indexFile.Close()
@@ -98,3 +203,11 @@ func (nm *NeedleMap) Close() {
func (nm *NeedleMap) ContentSize() uint64 {
return nm.fileByteCounter
}
+
+// iterate through all needles using the iterator function
+func (nm *NeedleMap) Walk(pedestrian func(*NeedleValue) error) (err error) {
+ if nm.m != nil {
+ return nm.m.Walk(pedestrian)
+ }
+ return nm.fm.Walk(pedestrian)
+}
diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go
index 00844bad3..fdb09a3c6 100644
--- a/weed-fs/src/pkg/storage/needle_read_write.go
+++ b/weed-fs/src/pkg/storage/needle_read_write.go
@@ -2,10 +2,10 @@ package storage
import (
"errors"
+ "fmt"
"io"
"os"
"pkg/util"
- "fmt"
)
func (n *Needle) Append(w io.Writer, version Version) uint32 {
@@ -62,7 +62,8 @@ func (n *Needle) Append(w io.Writer, version Version) uint32 {
return n.Size
}
func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
- if version == Version1 {
+ switch version {
+ case Version1:
bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
ret, e := r.Read(bytes)
n.readNeedleHeader(bytes)
@@ -72,7 +73,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
return 0, errors.New("CRC error! Data On Disk Corrupted!")
}
return ret, e
- } else if version == Version2 {
+ case Version2:
if size == 0 {
return 0, nil
}
@@ -95,7 +96,7 @@ func (n *Needle) Read(r io.Reader, size uint32, version Version) (int, error) {
}
return ret, e
}
- return 0, errors.New("Unsupported Version!")
+ return 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
func (n *Needle) readNeedleHeader(bytes []byte) {
n.Cookie = util.BytesToUint32(bytes[0:4])
diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go
index 86a9d219d..0902d1016 100644
--- a/weed-fs/src/pkg/storage/replication_type.go
+++ b/weed-fs/src/pkg/storage/replication_type.go
@@ -1,123 +1,123 @@
package storage
import (
- "errors"
+ "errors"
)
type ReplicationType string
const (
- Copy000 = ReplicationType("000") // single copy
- Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
- Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
- Copy100 = ReplicationType("100") // 2 copies, each on different data center
- Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
- Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
- LengthRelicationType = 6
- CopyNil = ReplicationType(255) // nil value
+ Copy000 = ReplicationType("000") // single copy
+ Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
+ Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
+ Copy100 = ReplicationType("100") // 2 copies, each on different data center
+ Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
+ Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
+ LengthRelicationType = 6
+ CopyNil = ReplicationType(255) // nil value
)
func NewReplicationTypeFromString(t string) (ReplicationType, error) {
- switch t {
- case "000":
- return Copy000, nil
- case "001":
- return Copy001, nil
- case "010":
- return Copy010, nil
- case "100":
- return Copy100, nil
- case "110":
- return Copy110, nil
- case "200":
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:"+t)
+ switch t {
+ case "000":
+ return Copy000, nil
+ case "001":
+ return Copy001, nil
+ case "010":
+ return Copy010, nil
+ case "100":
+ return Copy100, nil
+ case "110":
+ return Copy110, nil
+ case "200":
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:" + t)
}
func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
- switch b {
- case byte(000):
- return Copy000, nil
- case byte(001):
- return Copy001, nil
- case byte(010):
- return Copy010, nil
- case byte(100):
- return Copy100, nil
- case byte(110):
- return Copy110, nil
- case byte(200):
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:"+string(b))
+ switch b {
+ case byte(000):
+ return Copy000, nil
+ case byte(001):
+ return Copy001, nil
+ case byte(010):
+ return Copy010, nil
+ case byte(100):
+ return Copy100, nil
+ case byte(110):
+ return Copy110, nil
+ case byte(200):
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:" + string(b))
}
func (r *ReplicationType) String() string {
- switch *r {
- case Copy000:
- return "000"
- case Copy001:
- return "001"
- case Copy010:
- return "010"
- case Copy100:
- return "100"
- case Copy110:
- return "110"
- case Copy200:
- return "200"
- }
- return "000"
+ switch *r {
+ case Copy000:
+ return "000"
+ case Copy001:
+ return "001"
+ case Copy010:
+ return "010"
+ case Copy100:
+ return "100"
+ case Copy110:
+ return "110"
+ case Copy200:
+ return "200"
+ }
+ return "000"
}
func (r *ReplicationType) Byte() byte {
- switch *r {
- case Copy000:
- return byte(000)
- case Copy001:
- return byte(001)
- case Copy010:
- return byte(010)
- case Copy100:
- return byte(100)
- case Copy110:
- return byte(110)
- case Copy200:
- return byte(200)
- }
- return byte(000)
+ switch *r {
+ case Copy000:
+ return byte(000)
+ case Copy001:
+ return byte(001)
+ case Copy010:
+ return byte(010)
+ case Copy100:
+ return byte(100)
+ case Copy110:
+ return byte(110)
+ case Copy200:
+ return byte(200)
+ }
+ return byte(000)
}
-func (repType ReplicationType)GetReplicationLevelIndex() int {
- switch repType {
- case Copy000:
- return 0
- case Copy001:
- return 1
- case Copy010:
- return 2
- case Copy100:
- return 3
- case Copy110:
- return 4
- case Copy200:
- return 5
- }
- return -1
+func (repType ReplicationType) GetReplicationLevelIndex() int {
+ switch repType {
+ case Copy000:
+ return 0
+ case Copy001:
+ return 1
+ case Copy010:
+ return 2
+ case Copy100:
+ return 3
+ case Copy110:
+ return 4
+ case Copy200:
+ return 5
+ }
+ return -1
}
-func (repType ReplicationType)GetCopyCount() int {
- switch repType {
- case Copy000:
- return 1
- case Copy001:
- return 2
- case Copy010:
- return 2
- case Copy100:
- return 2
- case Copy110:
- return 3
- case Copy200:
- return 3
- }
- return 0
+func (repType ReplicationType) GetCopyCount() int {
+ switch repType {
+ case Copy000:
+ return 1
+ case Copy001:
+ return 2
+ case Copy010:
+ return 2
+ case Copy100:
+ return 2
+ case Copy110:
+ return 3
+ case Copy200:
+ return 3
+ }
+ return 0
}
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index a2c5f040b..d9e94ee56 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -33,6 +33,8 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
return
}
+
+// adds a volume to the store
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
rt, e := NewReplicationTypeFromString(replicationType)
if e != nil {
@@ -65,15 +67,16 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
}
return e
}
-func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
+func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
if s.volumes[vid] != nil {
return errors.New("Volume Id " + vid.String() + " already exists!")
}
log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
- s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
- return nil
+ s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
+ return err
}
+// checks whether compaction is needed
func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
@@ -85,6 +88,8 @@ func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString
}
return nil, garbageThreshold < s.volumes[vid].garbageLevel()
}
+
+// compacts the volume
func (s *Store) CompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
@@ -92,6 +97,8 @@ func (s *Store) CompactVolume(volumeIdString string) error {
}
return s.volumes[vid].compact()
}
+
+// commits the compaction
func (s *Store) CommitCompactVolume(volumeIdString string) error {
vid, err := NewVolumeId(volumeIdString)
if err != nil {
@@ -99,6 +106,8 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
}
return s.volumes[vid].commitCompact()
}
+
+// reads directory and loads volumes
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {
@@ -107,9 +116,12 @@ func (s *Store) loadExistingVolumes() {
base := name[:len(name)-len(".dat")]
if vid, err := NewVolumeId(base); err == nil {
if s.volumes[vid] == nil {
- v := NewVolume(s.dir, vid, CopyNil)
- s.volumes[vid] = v
- log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size())
+ if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
+ s.volumes[vid] = v
+ log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.replicaType, "version =", v.version, "size =", v.Size(), "frozen?", !v.IsWritable())
+ } else {
+ log.Println("ERROR loading volume", vid, "in dir", s.dir, ":", e.Error())
+ }
}
}
}
@@ -119,8 +131,16 @@ func (s *Store) loadExistingVolumes() {
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for k, v := range s.volumes {
- s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount = VolumeId(k), v.ContentSize(), v.replicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ s := &VolumeInfo{
+ Id: VolumeId(k),
+ Size: v.ContentSize(),
+ RepType: v.replicaType,
+ Version: v.Version(),
+ FileCount: v.nm.fileCounter,
+ DeleteCount: v.nm.deletionCounter,
+ DeletedByteCount: v.nm.deletionByteCounter,
+ Frozen: !v.IsWritable(),
+ }
stats = append(stats, s)
}
return stats
@@ -133,6 +153,8 @@ type JoinResult struct {
func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver
}
+
+// call master's /dir/join
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
@@ -170,7 +192,8 @@ func (s *Store) Close() {
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
if v := s.volumes[i]; v != nil {
size := v.write(n)
- if s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
+ if s.volumeSizeLimit < v.ContentSize()+uint64(size) &&
+ s.volumeSizeLimit >= v.ContentSize() {
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
s.Join()
}
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 0220bf895..9a7c33a42 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -3,8 +3,10 @@ package storage
import (
"errors"
"fmt"
+ "log"
"os"
"path"
+ "pkg/util"
"sync"
)
@@ -24,9 +26,9 @@ type Volume struct {
accessLock sync.Mutex
}
-func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
+func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
- v.load()
+ e = v.load()
return
}
func (v *Volume) load() error {
@@ -34,7 +36,14 @@ func (v *Volume) load() error {
fileName := path.Join(v.dir, v.Id.String())
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ if os.IsPermission(e) {
+ if util.FileExists(fileName + ".cdb") {
+ v.dataFile, e = os.Open(fileName + ".dat")
+ }
+ }
+ if e != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
}
if v.replicaType == CopyNil {
if e = v.readSuperBlock(); e != nil {
@@ -43,13 +52,19 @@ func (v *Volume) load() error {
} else {
v.maybeWriteSuperBlock()
}
- indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
- if ie != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ // TODO: if .idx not exists, but .cdb exists, then use (but don't load!) that
+ if !util.FileIsWritable(v.dataFile.Name()) { //Read-Only
+ v.nm, e = NewFrozenNeedleMap(fileName)
+ } else {
+ indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
+ if ie != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
+ v.nm, e = LoadNeedleMap(indexFile)
}
- v.nm = LoadNeedleMap(indexFile)
- return nil
+ return e
}
+
func (v *Volume) Version() Version {
return v.version
}
@@ -63,6 +78,18 @@ func (v *Volume) Size() int64 {
fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
return -1
}
+
+// a volume is writable, if its data file is writable and the index is not frozen
+func (v *Volume) IsWritable() bool {
+ stat, e := v.dataFile.Stat()
+ if e != nil {
+ log.Printf("Failed to read file permission %s %s\n", v.dataFile.Name(), e.Error())
+ return false
+ }
+ // 4 for r, 2 for w, 1 for x
+ return stat.Mode().Perm()&0222 > 0 && !v.nm.IsFrozen()
+}
+
func (v *Volume) Close() {
v.accessLock.Lock()
defer v.accessLock.Unlock()
@@ -79,21 +106,23 @@ func (v *Volume) maybeWriteSuperBlock() {
v.dataFile.Write(header)
}
}
-func (v *Volume) readSuperBlock() error {
+func (v *Volume) readSuperBlock() (err error) {
v.dataFile.Seek(0, 0)
header := make([]byte, SuperBlockSize)
if _, e := v.dataFile.Read(header); e != nil {
return fmt.Errorf("cannot read superblock: %s", e)
}
- var err error
v.version, v.replicaType, err = ParseSuperBlock(header)
return err
}
-func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, e error) {
+func ParseSuperBlock(header []byte) (version Version, replicaType ReplicationType, err error) {
version = Version(header[0])
- var err error
+ if version == 0 {
+ err = errors.New("Zero version impossible - bad superblock!")
+ return
+ }
if replicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
- e = fmt.Errorf("cannot read replica type: %s", err)
+ err = fmt.Errorf("cannot read replica type: %s", err)
}
return
}
@@ -221,3 +250,39 @@ func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string)
func (v *Volume) ContentSize() uint64 {
return v.nm.fileByteCounter
}
+
+// Walk over the contained needles (call the function with each NeedleValue till error is returned)
+func (v *Volume) WalkValues(pedestrian func(*Needle) error) error {
+ pedplus := func(nv *NeedleValue) (err error) {
+ n := new(Needle)
+ if nv.Offset > 0 {
+ v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
+ if _, err = n.Read(v.dataFile, nv.Size, v.version); err != nil {
+ return
+ }
+ if err = pedestrian(n); err != nil {
+ return
+ }
+ }
+ return nil
+ }
+ return v.nm.Walk(pedplus)
+}
+
+// Walk over the keys
+func (v *Volume) WalkKeys(pedestrian func(Key) error) error {
+ pedplus := func(nv *NeedleValue) (err error) {
+ if nv.Offset > 0 && nv.Key > 0 {
+ if err = pedestrian(nv.Key); err != nil {
+ return
+ }
+ }
+ return nil
+ }
+ return v.nm.Walk(pedplus)
+}
+
+func (v *Volume) String() string {
+ return fmt.Sprintf("%d@%s:v%d:r%s", v.Id, v.dataFile.Name(),
+ v.Version(), v.replicaType)
+}
diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go
index bf7396673..0333c6cf0 100644
--- a/weed-fs/src/pkg/storage/volume_id.go
+++ b/weed-fs/src/pkg/storage/volume_id.go
@@ -1,17 +1,18 @@
package storage
import (
- "strconv"
+ "strconv"
)
type VolumeId uint32
-func NewVolumeId(vid string) (VolumeId,error) {
- volumeId, err := strconv.ParseUint(vid, 10, 64)
- return VolumeId(volumeId), err
+
+func NewVolumeId(vid string) (VolumeId, error) {
+ volumeId, err := strconv.ParseUint(vid, 10, 64)
+ return VolumeId(volumeId), err
}
-func (vid *VolumeId) String() string{
- return strconv.FormatUint(uint64(*vid), 10)
+func (vid *VolumeId) String() string {
+ return strconv.FormatUint(uint64(*vid), 10)
}
-func (vid *VolumeId) Next() VolumeId{
- return VolumeId(uint32(*vid)+1)
+func (vid *VolumeId) Next() VolumeId {
+ return VolumeId(uint32(*vid) + 1)
}
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index e4c5f6ec4..845301670 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -10,4 +10,5 @@ type VolumeInfo struct {
FileCount int
DeleteCount int
DeletedByteCount uint64
+ Frozen bool
}
diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go
index da91ad038..9702ae904 100644
--- a/weed-fs/src/pkg/storage/volume_version.go
+++ b/weed-fs/src/pkg/storage/volume_version.go
@@ -1,12 +1,11 @@
package storage
-import (
-)
+import ()
type Version uint8
const (
- Version1 = Version(1)
- Version2 = Version(2)
- CurrentVersion = Version2
+ Version1 = Version(1)
+ Version2 = Version(2)
+ CurrentVersion = Version2
)
diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go
index 5542d1503..35d82c058 100644
--- a/weed-fs/src/pkg/topology/configuration_test.go
+++ b/weed-fs/src/pkg/topology/configuration_test.go
@@ -30,13 +30,13 @@ func TestLoadConfiguration(t *testing.T) {
</Configuration>
`
c, err := NewConfiguration([]byte(confContent))
-
- fmt.Printf("%s\n", c)
- if err!=nil{
- t.Fatalf("unmarshal error:%s",err.Error())
+
+ fmt.Printf("%s\n", c)
+ if err != nil {
+ t.Fatalf("unmarshal error:%s", err.Error())
}
-
+
if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
- t.Fatalf("unmarshal error:%s",c)
+ t.Fatalf("unmarshal error:%s", c)
}
}
diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go
index 2ec68fd4b..a3b2b7d13 100644
--- a/weed-fs/src/pkg/topology/data_center.go
+++ b/weed-fs/src/pkg/topology/data_center.go
@@ -1,7 +1,6 @@
package topology
-import (
-)
+import ()
type DataCenter struct {
NodeImpl
@@ -12,31 +11,31 @@ func NewDataCenter(id string) *DataCenter {
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node)
- dc.NodeImpl.value = dc
+ dc.NodeImpl.value = dc
return dc
}
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- if string(rack.Id()) == rackName {
- return rack
- }
- }
- rack := NewRack(rackName)
- dc.LinkChildNode(rack)
- return rack
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ if string(rack.Id()) == rackName {
+ return rack
+ }
+ }
+ rack := NewRack(rackName)
+ dc.LinkChildNode(rack)
+ return rack
}
-func (dc *DataCenter) ToMap() interface{}{
- m := make(map[string]interface{})
- m["Max"] = dc.GetMaxVolumeCount()
- m["Free"] = dc.FreeSpace()
- var racks []interface{}
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- racks = append(racks, rack.ToMap())
- }
- m["Racks"] = racks
- return m
+func (dc *DataCenter) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = dc.GetMaxVolumeCount()
+ m["Free"] = dc.FreeSpace()
+ var racks []interface{}
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ racks = append(racks, rack.ToMap())
+ }
+ m["Racks"] = racks
+ return m
}
diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go
index 1d9e1891a..3115e0213 100644
--- a/weed-fs/src/pkg/topology/node_list.go
+++ b/weed-fs/src/pkg/topology/node_list.go
@@ -37,14 +37,14 @@ func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
list = append(list, n)
}
}
- if n > len(list){
- return nil,false
+ if n > len(list) {
+ return nil, false
}
for i := n; i > 0; i-- {
- r := rand.Intn(i)
- t := list[r]
- list[r] = list[i-1]
- list[i-1] = t
+ r := rand.Intn(i)
+ t := list[r]
+ list[r] = list[i-1]
+ list[i-1] = t
}
return list[len(list)-n:], true
}
diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go
index 0d16a0526..2fb4fa970 100644
--- a/weed-fs/src/pkg/topology/node_list_test.go
+++ b/weed-fs/src/pkg/topology/node_list_test.go
@@ -1,39 +1,39 @@
package topology
import (
+ _ "fmt"
"strconv"
"testing"
- _ "fmt"
)
func TestXYZ(t *testing.T) {
- topo := NewTopology("topo","/etc/weed.conf", "/tmp","test",234,5)
+ topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
for i := 0; i < 5; i++ {
dc := NewDataCenter("dc" + strconv.Itoa(i))
dc.activeVolumeCount = i
dc.maxVolumeCount = 5
topo.LinkChildNode(dc)
}
- nl := NewNodeList(topo.Children(),nil)
+ nl := NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(1)
- if !ret || len(picked)!=1 {
- t.Errorf("need to randomly pick 1 node")
- }
+ picked, ret := nl.RandomlyPickN(1)
+ if !ret || len(picked) != 1 {
+ t.Errorf("need to randomly pick 1 node")
+ }
picked, ret = nl.RandomlyPickN(4)
- if !ret || len(picked)!=4 {
- t.Errorf("need to randomly pick 4 nodes")
+ if !ret || len(picked) != 4 {
+ t.Errorf("need to randomly pick 4 nodes")
}
- picked, ret = nl.RandomlyPickN(5)
- if !ret || len(picked)!=5 {
- t.Errorf("need to randomly pick 5 nodes")
- }
+ picked, ret = nl.RandomlyPickN(5)
+ if !ret || len(picked) != 5 {
+ t.Errorf("need to randomly pick 5 nodes")
+ }
- picked, ret = nl.RandomlyPickN(6)
- if ret || len(picked)!=0 {
- t.Errorf("can not randomly pick 6 nodes:", ret, picked)
- }
+ picked, ret = nl.RandomlyPickN(6)
+ if ret || len(picked) != 0 {
+ t.Errorf("can not randomly pick 6 nodes:", ret, picked)
+ }
}
diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go
index 1555ef682..acc34417a 100644
--- a/weed-fs/src/pkg/topology/rack.go
+++ b/weed-fs/src/pkg/topology/rack.go
@@ -19,13 +19,13 @@ func NewRack(id string) *Rack {
}
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- return dn
- }
- }
- return nil
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ return dn
+ }
+ }
+ return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() {
diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go
index 83356b38c..71a901c8e 100644
--- a/weed-fs/src/pkg/topology/topo_test.go
+++ b/weed-fs/src/pkg/topology/topo_test.go
@@ -78,7 +78,7 @@ func setup(topologyLayout string) *Topology {
}
//need to connect all nodes first before server adding volumes
- topo := NewTopology("mynetwork","/etc/weed.conf","/tmp","test",234,5)
+ topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)
@@ -94,7 +94,7 @@ func setup(topologyLayout string) *Topology {
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version:storage.CurrentVersion}
+ vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
@@ -119,9 +119,9 @@ func TestRemoveDataCenter(t *testing.T) {
func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- rand.Seed(1)
+ rand.Seed(time.Now().UnixNano())
+ rand.Seed(1)
ret, node, vid := topo.RandomlyReserveOneVolume()
- fmt.Println("assigned :", ret, ", node :", node,", volume id:", vid)
+ fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
}
diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go
index 050fe4cd8..dee6514f4 100644
--- a/weed-fs/src/pkg/topology/topology_compact.go
+++ b/weed-fs/src/pkg/topology/topology_compact.go
@@ -101,10 +101,10 @@ type VacuumVolumeResult struct {
func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
values := make(url.Values)
values.Add("volume", vid.String())
- values.Add("garbageThreshold", garbageThreshold)
+ values.Add("garbageThreshold", garbageThreshold)
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
if err != nil {
- fmt.Println("parameters:",values)
+ fmt.Println("parameters:", values)
return err, false
}
var ret VacuumVolumeResult
diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go
index b33b4d768..debedc3d3 100644
--- a/weed-fs/src/pkg/topology/topology_event_handling.go
+++ b/weed-fs/src/pkg/topology/topology_event_handling.go
@@ -52,14 +52,14 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
vl := t.GetVolumeLayout(v.RepType)
vl.SetVolumeUnavailable(dn, v.Id)
}
- dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
+ dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id())
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.RepType)
+ vl := t.GetVolumeLayout(v.RepType)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go
index 9ccf08ae3..b416ee943 100644
--- a/weed-fs/src/pkg/topology/topology_map.go
+++ b/weed-fs/src/pkg/topology/topology_map.go
@@ -1,7 +1,6 @@
package topology
-import (
-)
+import ()
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
index 314aca69f..141a40072 100644
--- a/weed-fs/src/pkg/topology/volume_layout.go
+++ b/weed-fs/src/pkg/topology/volume_layout.go
@@ -5,12 +5,15 @@ import (
"fmt"
"math/rand"
"pkg/storage"
+ "sort"
)
+type volumeIdList []storage.VolumeId
+
type VolumeLayout struct {
repType storage.ReplicationType
vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
+ writables volumeIdList // transient (sorted!) array of writable volume Ids
pulse int64
volumeSizeLimit uint64
}
@@ -19,7 +22,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
return &VolumeLayout{
repType: repType,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
+ writables: make(volumeIdList, 0, 4),
pulse: pulse,
volumeSizeLimit: volumeSizeLimit,
}
@@ -33,13 +36,18 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
if vl.isWritable(v) {
vl.writables = append(vl.writables, v.Id)
+ if len(vl.writables) > 1 {
+ vl.writables.Sort()
+ }
}
}
}
}
-func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool{
- return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
+func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
+ return !v.Frozen &&
+ uint64(v.Size) < vl.volumeSizeLimit &&
+ v.Version == storage.CurrentVersion
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
@@ -52,7 +60,13 @@ func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *Volume
fmt.Println("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- vid := vl.writables[rand.Intn(len_writers)]
+ var vid storage.VolumeId
+ if len_writers == 1 {
+ vid = vl.writables[0]
+ } else {
+ // skew for lesser indices
+ vid = vl.writables[rand.Intn(len_writers+1)%len_writers]
+ }
locationList := vl.vid2location[vid]
if locationList != nil {
return &vid, count, locationList, nil
@@ -80,8 +94,12 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
return false
}
}
+ // FIXME: how to refuse if volume is unwritable/frozen?
fmt.Println("Volume", vid, "becomes writable")
vl.writables = append(vl.writables, vid)
+ if len(vl.writables) > 1 {
+ vl.writables.Sort()
+ }
return true
}
@@ -114,3 +132,18 @@ func (vl *VolumeLayout) ToMap() interface{} {
//m["locations"] = vl.vid2location
return m
}
+
+func (vls volumeIdList) Len() int { return len(vls) }
+
+func (vls volumeIdList) Less(i, j int) bool {
+ return vls[i] < vls[j]
+}
+
+func (vls volumeIdList) Swap(i, j int) {
+ vls[i], vls[j] = vls[j], vls[i]
+}
+
+// convienence sorting
+func (vls volumeIdList) Sort() {
+ sort.Sort(vls)
+}
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
index 64d8cdf43..507a240b5 100644
--- a/weed-fs/src/pkg/topology/volume_location.go
+++ b/weed-fs/src/pkg/topology/volume_location.go
@@ -15,7 +15,7 @@ func (dnll *VolumeLocationList) Head() *DataNode {
}
func (dnll *VolumeLocationList) Length() int {
- return len(dnll.list)
+ return len(dnll.list)
}
func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
@@ -29,13 +29,13 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
}
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
- for i, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- dnll.list = append(dnll.list[:i],dnll.list[i+1:]...)
- return true
- }
- }
- return false
+ for i, dnl := range dnll.list {
+ if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
+ dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
+ return true
+ }
+ }
+ return false
}
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go
index 177da06db..6cc3d7018 100644
--- a/weed-fs/src/pkg/util/bytes.go
+++ b/weed-fs/src/pkg/util/bytes.go
@@ -1,34 +1,33 @@
package util
-func BytesToUint64(b []byte)(v uint64){
- length := uint(len(b))
- for i :=uint(0);i<length-1;i++ {
- v += uint64(b[i])
- v <<= 8
- }
- v+=uint64(b[length-1])
- return
+func BytesToUint64(b []byte) (v uint64) {
+ length := uint(len(b))
+ for i := uint(0); i < length-1; i++ {
+ v += uint64(b[i])
+ v <<= 8
+ }
+ v += uint64(b[length-1])
+ return
}
-func BytesToUint32(b []byte)(v uint32){
- length := uint(len(b))
- for i :=uint(0);i<length-1;i++ {
- v += uint32(b[i])
- v <<= 8
- }
- v+=uint32(b[length-1])
- return
+func BytesToUint32(b []byte) (v uint32) {
+ length := uint(len(b))
+ for i := uint(0); i < length-1; i++ {
+ v += uint32(b[i])
+ v <<= 8
+ }
+ v += uint32(b[length-1])
+ return
}
-func Uint64toBytes(b []byte, v uint64){
- for i :=uint(0);i<8;i++ {
- b[7-i] = byte(v>>(i*8))
- }
+func Uint64toBytes(b []byte, v uint64) {
+ for i := uint(0); i < 8; i++ {
+ b[7-i] = byte(v >> (i * 8))
+ }
}
-func Uint32toBytes(b []byte, v uint32){
- for i :=uint(0);i<4;i++ {
- b[3-i] = byte(v>>(i*8))
- }
+func Uint32toBytes(b []byte, v uint32) {
+ for i := uint(0); i < 4; i++ {
+ b[3-i] = byte(v >> (i * 8))
+ }
}
-func Uint8toBytes(b []byte, v uint8){
- b[0] = byte(v)
+func Uint8toBytes(b []byte, v uint8) {
+ b[0] = byte(v)
}
-
diff --git a/weed-fs/src/pkg/util/file.go b/weed-fs/src/pkg/util/file.go
new file mode 100644
index 000000000..bf3ea66de
--- /dev/null
+++ b/weed-fs/src/pkg/util/file.go
@@ -0,0 +1,63 @@
+package util
+
+import (
+ "errors"
+ "log"
+ "os"
+)
+
+// sets file (fh if not nil, otherwise fileName) permission to mask
+// it will
+// AND with the permission iff direction < 0
+// OR with the permission iff direction > 0
+// otherwise it will SET the permission to the mask
+func SetFilePerm(fh *os.File, fileName string, mask os.FileMode, direction int8) (err error) {
+ var stat os.FileInfo
+ if fh == nil {
+ stat, err = os.Stat(fileName)
+ } else {
+ stat, err = fh.Stat()
+ }
+ if err != nil {
+ return err
+ }
+
+ mode := stat.Mode() & ^os.ModePerm
+ // log.Printf("mode1=%d mask=%d", mode, mask)
+ if direction == 0 {
+ mode |= mask
+ } else if direction > 0 {
+ mode |= stat.Mode().Perm() | mask
+ } else {
+ mode |= stat.Mode().Perm() & mask
+ }
+ log.Printf("pmode=%d operm=%d => nmode=%d nperm=%d",
+ stat.Mode(), stat.Mode()&os.ModePerm,
+ mode, mode&os.ModePerm)
+ if mode == 0 {
+ return errors.New("Zero FileMode")
+ }
+ if fh == nil {
+ err = os.Chmod(fileName, mode)
+ } else {
+ err = fh.Chmod(mode)
+ }
+ return err
+}
+
+// returns whether the filename exists - errors doesn't mean not exists!
+func FileExists(fileName string) bool {
+ if _, e := os.Stat(fileName); e != nil && os.IsNotExist(e) {
+ return false
+ }
+ return true
+}
+
+// returns whether the filename is POSSIBLY writable
+//- whether it has some kind of writable bit set
+func FileIsWritable(fileName string) bool {
+ if stat, e := os.Stat(fileName); e == nil {
+ return stat.Mode().Perm()&0222 > 0
+ }
+ return false
+}
diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go
index 6a4350e72..930da9522 100644
--- a/weed-fs/src/pkg/util/parse.go
+++ b/weed-fs/src/pkg/util/parse.go
@@ -1,16 +1,16 @@
package util
import (
- "strconv"
+ "strconv"
)
-func ParseInt(text string, defaultValue int) int{
- count, parseError := strconv.ParseUint(text,10,64)
- if parseError!=nil {
- if len(text)>0{
- return 0
- }
- return defaultValue
- }
- return int(count)
+func ParseInt(text string, defaultValue int) int {
+ count, parseError := strconv.ParseUint(text, 10, 64)
+ if parseError != nil {
+ if len(text) > 0 {
+ return 0
+ }
+ return defaultValue
+ }
+ return int(count)
}
diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go
index f643faa6b..6e6ab0003 100644
--- a/weed-fs/src/pkg/util/post.go
+++ b/weed-fs/src/pkg/util/post.go
@@ -16,7 +16,7 @@ func Post(url string, values url.Values) ([]byte, error) {
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
- log.Println("read post result from", url, err)
+ log.Println("read post result from", url, err)
return nil, err
}
return b, nil