aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'weed-fs/src/pkg')
-rw-r--r--weed-fs/src/pkg/directory/file_id.go38
-rw-r--r--weed-fs/src/pkg/operation/allocate_volume.go32
-rw-r--r--weed-fs/src/pkg/operation/delete_content.go16
-rw-r--r--weed-fs/src/pkg/operation/lookup_volume_id.go38
-rw-r--r--weed-fs/src/pkg/operation/upload_content.go47
-rw-r--r--weed-fs/src/pkg/replication/volume_growth.go195
-rw-r--r--weed-fs/src/pkg/replication/volume_growth_test.go129
-rw-r--r--weed-fs/src/pkg/sequence/sequence.go71
-rw-r--r--weed-fs/src/pkg/storage/compact_map.go182
-rw-r--r--weed-fs/src/pkg/storage/compact_map_perf_test.go43
-rw-r--r--weed-fs/src/pkg/storage/compact_map_test.go63
-rw-r--r--weed-fs/src/pkg/storage/compress.go57
-rw-r--r--weed-fs/src/pkg/storage/crc.go21
-rw-r--r--weed-fs/src/pkg/storage/needle.go132
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go99
-rw-r--r--weed-fs/src/pkg/storage/needle_read_write.go238
-rw-r--r--weed-fs/src/pkg/storage/replication_type.go123
-rw-r--r--weed-fs/src/pkg/storage/sample.idxbin27140560 -> 0 bytes
-rw-r--r--weed-fs/src/pkg/storage/store.go204
-rw-r--r--weed-fs/src/pkg/storage/volume.go274
-rw-r--r--weed-fs/src/pkg/storage/volume_id.go18
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go13
-rw-r--r--weed-fs/src/pkg/storage/volume_version.go11
-rw-r--r--weed-fs/src/pkg/topology/configuration.go56
-rw-r--r--weed-fs/src/pkg/topology/configuration_test.go42
-rw-r--r--weed-fs/src/pkg/topology/data_center.go41
-rw-r--r--weed-fs/src/pkg/topology/data_node.go60
-rw-r--r--weed-fs/src/pkg/topology/node.go200
-rw-r--r--weed-fs/src/pkg/topology/node_list.go69
-rw-r--r--weed-fs/src/pkg/topology/node_list_test.go39
-rw-r--r--weed-fs/src/pkg/topology/rack.go64
-rw-r--r--weed-fs/src/pkg/topology/topo_test.go127
-rw-r--r--weed-fs/src/pkg/topology/topology.go148
-rw-r--r--weed-fs/src/pkg/topology/topology_compact.go150
-rw-r--r--weed-fs/src/pkg/topology/topology_event_handling.go67
-rw-r--r--weed-fs/src/pkg/topology/topology_map.go50
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go116
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go58
-rw-r--r--weed-fs/src/pkg/util/bytes.go33
-rw-r--r--weed-fs/src/pkg/util/config.go128
-rw-r--r--weed-fs/src/pkg/util/parse.go16
-rw-r--r--weed-fs/src/pkg/util/post.go23
42 files changed, 0 insertions, 3531 deletions
diff --git a/weed-fs/src/pkg/directory/file_id.go b/weed-fs/src/pkg/directory/file_id.go
deleted file mode 100644
index cd4204f32..000000000
--- a/weed-fs/src/pkg/directory/file_id.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package directory
-
-import (
- "encoding/hex"
- "pkg/storage"
- "pkg/util"
- "strings"
-)
-
-type FileId struct {
- VolumeId storage.VolumeId
- Key uint64
- Hashcode uint32
-}
-
-func NewFileId(VolumeId storage.VolumeId, Key uint64, Hashcode uint32) *FileId {
- return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
-}
-func ParseFileId(fid string) *FileId {
- a := strings.Split(fid, ",")
- if len(a) != 2 {
- println("Invalid fid", fid, ", split length", len(a))
- return nil
- }
- vid_string, key_hash_string := a[0], a[1]
- volumeId, _ := storage.NewVolumeId(vid_string)
- key, hash := storage.ParseKeyHash(key_hash_string)
- return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}
-}
-func (n *FileId) String() string {
- bytes := make([]byte, 12)
- util.Uint64toBytes(bytes[0:8], n.Key)
- util.Uint32toBytes(bytes[8:12], n.Hashcode)
- nonzero_index := 0
- for ; bytes[nonzero_index] == 0; nonzero_index++ {
- }
- return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:])
-}
diff --git a/weed-fs/src/pkg/operation/allocate_volume.go b/weed-fs/src/pkg/operation/allocate_volume.go
deleted file mode 100644
index c93ccfb62..000000000
--- a/weed-fs/src/pkg/operation/allocate_volume.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "errors"
- "net/url"
- "pkg/storage"
- "pkg/topology"
- "pkg/util"
-)
-
-type AllocateVolumeResult struct {
- Error string
-}
-
-func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("replicationType", repType.String())
- jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
- if err != nil {
- return err
- }
- var ret AllocateVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go
deleted file mode 100644
index 2bdb49651..000000000
--- a/weed-fs/src/pkg/operation/delete_content.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package operation
-
-import (
- "log"
- "net/http"
-)
-
-func Delete(url string) error {
- req, err := http.NewRequest("DELETE", url, nil)
- if err != nil {
- log.Println("failing to delete", url)
- return err
- }
- _, err = http.DefaultClient.Do(req)
- return err
-}
diff --git a/weed-fs/src/pkg/operation/lookup_volume_id.go b/weed-fs/src/pkg/operation/lookup_volume_id.go
deleted file mode 100644
index 50a6d91e6..000000000
--- a/weed-fs/src/pkg/operation/lookup_volume_id.go
+++ /dev/null
@@ -1,38 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "errors"
- _ "fmt"
- "net/url"
- "pkg/storage"
- "pkg/util"
-)
-
-type Location struct {
- Url string "url"
- PublicUrl string "publicUrl"
-}
-type LookupResult struct {
- Locations []Location "locations"
- Error string "error"
-}
-
-//TODO: Add a caching for vid here
-func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
- values := make(url.Values)
- values.Add("volumeId", vid.String())
- jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
- if err != nil {
- return nil, err
- }
- var ret LookupResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, err
- }
- if ret.Error != "" {
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
-}
diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go
deleted file mode 100644
index 0bdb697da..000000000
--- a/weed-fs/src/pkg/operation/upload_content.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package operation
-
-import (
- "bytes"
- "encoding/json"
- "errors"
- _ "fmt"
- "io"
- "io/ioutil"
- "log"
- "mime/multipart"
- "net/http"
-)
-
-type UploadResult struct {
- Size int
- Error string
-}
-
-func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
- body_buf := bytes.NewBufferString("")
- body_writer := multipart.NewWriter(body_buf)
- file_writer, err := body_writer.CreateFormFile("file", filename)
- io.Copy(file_writer, reader)
- content_type := body_writer.FormDataContentType()
- body_writer.Close()
- resp, err := http.Post(uploadUrl, content_type, body_buf)
- if err != nil {
- log.Println("failing to upload to", uploadUrl)
- return nil, err
- }
- defer resp.Body.Close()
- resp_body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return nil, err
- }
- var ret UploadResult
- err = json.Unmarshal(resp_body, &ret)
- if err != nil {
- log.Println("failing to read upload resonse", uploadUrl, resp_body)
- return nil, err
- }
- if ret.Error != "" {
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
-}
diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go
deleted file mode 100644
index ce0094a7c..000000000
--- a/weed-fs/src/pkg/replication/volume_growth.go
+++ /dev/null
@@ -1,195 +0,0 @@
-package replication
-
-import (
- "errors"
- "fmt"
- "math/rand"
- "pkg/operation"
- "pkg/storage"
- "pkg/topology"
- "sync"
-)
-
-/*
-This package is created to resolve these replica placement issues:
-1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
-2. in time of tight storage, how to reduce replica level
-3. optimizing for hot data on faster disk, cold data on cheaper storage,
-4. volume allocation for each bucket
-*/
-
-type VolumeGrowth struct {
- copy1factor int
- copy2factor int
- copy3factor int
- copyAll int
-
- accessLock sync.Mutex
-}
-
-func NewDefaultVolumeGrowth() *VolumeGrowth {
- return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
-}
-
-func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
- switch repType {
- case storage.Copy000:
- return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
- case storage.Copy001:
- return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
- case storage.Copy010:
- return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
- case storage.Copy100:
- return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
- case storage.Copy110:
- return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
- case storage.Copy200:
- return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
- }
- return 0, errors.New("Unknown Replication Type!")
-}
-func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
- vg.accessLock.Lock()
- defer vg.accessLock.Unlock()
-
- counter = 0
- switch repType {
- case storage.Copy000:
- for i := 0; i < count; i++ {
- if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
- if err = vg.grow(topo, *vid, repType, server); err == nil {
- counter++
- }
- }
- }
- case storage.Copy001:
- for i := 0; i < count; i++ {
- //randomly pick one server, and then choose from the same rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
- rack := server1.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[server1.String()] = server1
- newNodeList := topology.NewNodeList(rack.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
- counter++
- }
- }
- }
- }
- }
- case storage.Copy010:
- for i := 0; i < count; i++ {
- //randomly pick one server, and then choose from the same rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
- rack := server1.Parent()
- dc := rack.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[rack.String()] = rack
- newNodeList := topology.NewNodeList(dc.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
- counter++
- }
- }
- }
- }
- }
- case storage.Copy100:
- for i := 0; i < count; i++ {
- nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2, 1)
- vid := topo.NextVolumeId()
- if ret {
- var servers []*topology.DataNode
- for _, n := range picked {
- if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
- servers = append(servers, server)
- }
- }
- }
- if len(servers) == 2 {
- if err = vg.grow(topo, vid, repType, servers...); err == nil {
- counter++
- }
- }
- }
- }
- case storage.Copy110:
- for i := 0; i < count; i++ {
- nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2, 2)
- vid := topo.NextVolumeId()
- if ret {
- var servers []*topology.DataNode
- dc1, dc2 := picked[0], picked[1]
- if dc2.FreeSpace() > dc1.FreeSpace() {
- dc1, dc2 = dc2, dc1
- }
- if dc1.FreeSpace() > 0 {
- if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok {
- servers = append(servers, server1)
- rack := server1.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[rack.String()] = rack
- newNodeList := topology.NewNodeList(dc1.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 {
- servers = append(servers, server2)
- }
- }
- }
- }
- if dc2.FreeSpace() > 0 {
- if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok {
- servers = append(servers, server)
- }
- }
- if len(servers) == 3 {
- if err = vg.grow(topo, vid, repType, servers...); err == nil {
- counter++
- }
- }
- }
- }
- case storage.Copy200:
- for i := 0; i < count; i++ {
- nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(3, 1)
- vid := topo.NextVolumeId()
- if ret {
- var servers []*topology.DataNode
- for _, n := range picked {
- if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
- servers = append(servers, server)
- }
- }
- }
- if len(servers) == 3 {
- if err = vg.grow(topo, vid, repType, servers...); err == nil {
- counter++
- }
- }
- }
- }
- }
- return
-}
-func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
- for _, server := range servers {
- if err := operation.AllocateVolume(server, vid, repType); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- topo.RegisterVolumeLayout(&vi, server)
- fmt.Println("Created Volume", vid, "on", server)
- } else {
- fmt.Println("Failed to assign", vid, "to", servers)
- return errors.New("Failed to assign " + vid.String())
- }
- }
- return nil
-}
diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go
deleted file mode 100644
index 659564c64..000000000
--- a/weed-fs/src/pkg/replication/volume_growth_test.go
+++ /dev/null
@@ -1,129 +0,0 @@
-package replication
-
-import (
- "encoding/json"
- "fmt"
- "math/rand"
- "pkg/storage"
- "pkg/topology"
- "testing"
- "time"
-)
-
-var topologyLayout = `
-{
- "dc1":{
- "rack1":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server2":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server1":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server2":{
- "volumes":[],
- "limit":4
- },
- "server3":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":2
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
-}
-`
-
-func setup(topologyLayout string) *topology.Topology {
- var data interface{}
- err := json.Unmarshal([]byte(topologyLayout), &data)
- if err != nil {
- fmt.Println("error:", err)
- }
- fmt.Println("data:", data)
-
- //need to connect all nodes first before server adding volumes
- topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5)
- mTopology := data.(map[string]interface{})
- for dcKey, dcValue := range mTopology {
- dc := topology.NewDataCenter(dcKey)
- dcMap := dcValue.(map[string]interface{})
- topo.LinkChildNode(dc)
- for rackKey, rackValue := range dcMap {
- rack := topology.NewRack(rackKey)
- rackMap := rackValue.(map[string]interface{})
- dc.LinkChildNode(rack)
- for serverKey, serverValue := range rackMap {
- server := topology.NewDataNode(serverKey)
- serverMap := serverValue.(map[string]interface{})
- rack.LinkChildNode(server)
- for _, v := range serverMap["volumes"].([]interface{}) {
- m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- }
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
- }
- }
- }
-
- return topo
-}
-
-func TestRemoveDataCenter(t *testing.T) {
- topo := setup(topologyLayout)
- topo.UnlinkChildNode(topology.NodeId("dc2"))
- if topo.GetActiveVolumeCount() != 15 {
- t.Fail()
- }
- topo.UnlinkChildNode(topology.NodeId("dc3"))
- if topo.GetActiveVolumeCount() != 12 {
- t.Fail()
- }
-}
-
-func TestReserveOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
- if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
- t.Log("reserved", c)
- }
-}
diff --git a/weed-fs/src/pkg/sequence/sequence.go b/weed-fs/src/pkg/sequence/sequence.go
deleted file mode 100644
index c85289468..000000000
--- a/weed-fs/src/pkg/sequence/sequence.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package sequence
-
-import (
- "encoding/gob"
- "log"
- "os"
- "path"
- "sync"
-)
-
-const (
- FileIdSaveInterval = 10000
-)
-
-type Sequencer interface {
- NextFileId(count int) (uint64, int)
-}
-type SequencerImpl struct {
- dir string
- fileName string
-
- volumeLock sync.Mutex
- sequenceLock sync.Mutex
-
- FileIdSequence uint64
- fileIdCounter uint64
-}
-
-func NewSequencer(dirname string, filename string) (m *SequencerImpl) {
- m = &SequencerImpl{dir: dirname, fileName: filename}
-
- seqFile, se := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_RDONLY, 0644)
- if se != nil {
- m.FileIdSequence = FileIdSaveInterval
- log.Println("Setting file id sequence", m.FileIdSequence)
- } else {
- decoder := gob.NewDecoder(seqFile)
- defer seqFile.Close()
- decoder.Decode(&m.FileIdSequence)
- log.Println("Loading file id sequence", m.FileIdSequence, "=>", m.FileIdSequence+FileIdSaveInterval)
- //in case the server stops between intervals
- m.FileIdSequence += FileIdSaveInterval
- }
- return
-}
-
-//count should be 1 or more
-func (m *SequencerImpl) NextFileId(count int) (uint64, int) {
- if count <= 0 {
- return 0, 0
- }
- m.sequenceLock.Lock()
- defer m.sequenceLock.Unlock()
- if m.fileIdCounter < uint64(count) {
- m.fileIdCounter = FileIdSaveInterval
- m.FileIdSequence += FileIdSaveInterval
- m.saveSequence()
- }
- m.fileIdCounter = m.fileIdCounter - uint64(count)
- return m.FileIdSequence - m.fileIdCounter, count
-}
-func (m *SequencerImpl) saveSequence() {
- log.Println("Saving file id sequence", m.FileIdSequence, "to", path.Join(m.dir, m.fileName+".seq"))
- seqFile, e := os.OpenFile(path.Join(m.dir, m.fileName+".seq"), os.O_CREATE|os.O_WRONLY, 0644)
- if e != nil {
- log.Fatalf("Sequence File Save [ERROR] %s\n", e)
- }
- defer seqFile.Close()
- encoder := gob.NewEncoder(seqFile)
- encoder.Encode(m.FileIdSequence)
-}
diff --git a/weed-fs/src/pkg/storage/compact_map.go b/weed-fs/src/pkg/storage/compact_map.go
deleted file mode 100644
index 0b33961c4..000000000
--- a/weed-fs/src/pkg/storage/compact_map.go
+++ /dev/null
@@ -1,182 +0,0 @@
-package storage
-
-import ()
-
-type NeedleValue struct {
- Key Key
- Offset uint32 "Volume offset" //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 "Size of the data portion"
-}
-
-const (
- batch = 100000
-)
-
-type Key uint64
-
-type CompactSection struct {
- values []NeedleValue
- overflow map[Key]NeedleValue
- start Key
- end Key
- counter int
-}
-
-func NewCompactSection(start Key) CompactSection {
- return CompactSection{
- values: make([]NeedleValue, batch),
- overflow: make(map[Key]NeedleValue),
- start: start,
- }
-}
-
-//return old entry size
-func (cs *CompactSection) Set(key Key, offset uint32, size uint32) uint32 {
- ret := uint32(0)
- if key > cs.end {
- cs.end = key
- }
- if i := cs.binarySearchValues(key); i >= 0 {
- ret = cs.values[i].Size
- //println("key", key, "old size", ret)
- cs.values[i].Offset, cs.values[i].Size = offset, size
- } else {
- needOverflow := cs.counter >= batch
- needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > key
- if needOverflow {
- //println("start", cs.start, "counter", cs.counter, "key", key)
- if oldValue, found := cs.overflow[key]; found {
- ret = oldValue.Size
- }
- cs.overflow[key] = NeedleValue{Key: key, Offset: offset, Size: size}
- } else {
- p := &cs.values[cs.counter]
- p.Key, p.Offset, p.Size = key, offset, size
- //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
- cs.counter++
- }
- }
- return ret
-}
-
-//return old entry size
-func (cs *CompactSection) Delete(key Key) uint32 {
- ret := uint32(0)
- if i := cs.binarySearchValues(key); i >= 0 {
- if cs.values[i].Size > 0 {
- ret = cs.values[i].Size
- cs.values[i].Size = 0
- }
- }
- if v, found := cs.overflow[key]; found {
- delete(cs.overflow, key)
- ret = v.Size
- }
- return ret
-}
-func (cs *CompactSection) Get(key Key) (*NeedleValue, bool) {
- if v, ok := cs.overflow[key]; ok {
- return &v, true
- }
- if i := cs.binarySearchValues(key); i >= 0 {
- return &cs.values[i], true
- }
- return nil, false
-}
-func (cs *CompactSection) binarySearchValues(key Key) int {
- l, h := 0, cs.counter-1
- if h >= 0 && cs.values[h].Key < key {
- return -2
- }
- //println("looking for key", key)
- for l <= h {
- m := (l + h) / 2
- //println("mid", m, "key", cs.values[m].Key, cs.values[m].Offset, cs.values[m].Size)
- if cs.values[m].Key < key {
- l = m + 1
- } else if key < cs.values[m].Key {
- h = m - 1
- } else {
- //println("found", m)
- return m
- }
- }
- return -1
-}
-
-//This map assumes mostly inserting increasing keys
-type CompactMap struct {
- list []CompactSection
-}
-
-func NewCompactMap() CompactMap {
- return CompactMap{}
-}
-
-func (cm *CompactMap) Set(key Key, offset uint32, size uint32) uint32 {
- x := cm.binarySearchCompactSection(key)
- if x < 0 {
- //println(x, "creating", len(cm.list), "section1, starting", key)
- cm.list = append(cm.list, NewCompactSection(key))
- x = len(cm.list) - 1
- }
- return cm.list[x].Set(key, offset, size)
-}
-func (cm *CompactMap) Delete(key Key) uint32 {
- x := cm.binarySearchCompactSection(key)
- if x < 0 {
- return uint32(0)
- }
- return cm.list[x].Delete(key)
-}
-func (cm *CompactMap) Get(key Key) (*NeedleValue, bool) {
- x := cm.binarySearchCompactSection(key)
- if x < 0 {
- return nil, false
- }
- return cm.list[x].Get(key)
-}
-func (cm *CompactMap) binarySearchCompactSection(key Key) int {
- l, h := 0, len(cm.list)-1
- if h < 0 {
- return -5
- }
- if cm.list[h].start <= key {
- if cm.list[h].counter < batch || key <= cm.list[h].end {
- return h
- } else {
- return -4
- }
- }
- for l <= h {
- m := (l + h) / 2
- if key < cm.list[m].start {
- h = m - 1
- } else { // cm.list[m].start <= key
- if cm.list[m+1].start <= key {
- l = m + 1
- } else {
- return m
- }
- }
- }
- return -3
-}
-
-func (cm *CompactMap) Visit(visit func(NeedleValue) error) error {
- for _, cs := range cm.list {
- for _, v := range cs.overflow {
- if err := visit(v); err != nil {
- return err
- }
- }
- for _, v := range cs.values {
- if _, found := cs.overflow[v.Key]; !found {
- if err := visit(v); err != nil {
- return err
- }
- }
- }
- }
- return nil
-}
diff --git a/weed-fs/src/pkg/storage/compact_map_perf_test.go b/weed-fs/src/pkg/storage/compact_map_perf_test.go
deleted file mode 100644
index cfa521fc8..000000000
--- a/weed-fs/src/pkg/storage/compact_map_perf_test.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package storage
-
-import (
- "log"
- "os"
- "pkg/util"
- "testing"
-)
-
-func TestMemoryUsage(t *testing.T) {
-
- indexFile, ie := os.OpenFile("sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
- if ie != nil {
- log.Fatalln(ie)
- }
- LoadNewNeedleMap(indexFile)
-
-}
-
-func LoadNewNeedleMap(file *os.File) CompactMap {
- m := NewCompactMap()
- bytes := make([]byte, 16*1024)
- count, e := file.Read(bytes)
- if count > 0 {
- fstat, _ := file.Stat()
- log.Println("Loading index file", fstat.Name(), "size", fstat.Size())
- }
- for count > 0 && e == nil {
- for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- if offset > 0 {
- m.Set(Key(key), offset, size)
- } else {
- //delete(m, key)
- }
- }
-
- count, e = file.Read(bytes)
- }
- return m
-}
diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go
deleted file mode 100644
index e76e9578d..000000000
--- a/weed-fs/src/pkg/storage/compact_map_test.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package storage
-
-import (
- "testing"
-)
-
-func TestXYZ(t *testing.T) {
- m := NewCompactMap()
- for i := uint32(0); i < 100*batch; i += 2 {
- m.Set(Key(i), i, i)
- }
-
- for i := uint32(0); i < 100*batch; i += 37 {
- m.Delete(Key(i))
- }
-
- for i := uint32(0); i < 10*batch; i += 3 {
- m.Set(Key(i), i+11, i+5)
- }
-
- // for i := uint32(0); i < 100; i++ {
- // if v := m.Get(Key(i)); v != nil {
- // println(i, "=", v.Key, v.Offset, v.Size)
- // }
- // }
-
- for i := uint32(0); i < 10*batch; i++ {
- v, ok := m.Get(Key(i))
- if i%3 == 0 {
- if !ok {
- t.Fatal("key", i, "missing!")
- }
- if v.Size != i+5 {
- t.Fatal("key", i, "size", v.Size)
- }
- } else if i%37 == 0 {
- if ok && v.Size > 0 {
- t.Fatal("key", i, "should have been deleted needle value", v)
- }
- } else if i%2 == 0 {
- if v.Size != i {
- t.Fatal("key", i, "size", v.Size)
- }
- }
- }
-
- for i := uint32(10 * batch); i < 100*batch; i++ {
- v, ok := m.Get(Key(i))
- if i%37 == 0 {
- if ok && v.Size > 0 {
- t.Fatal("key", i, "should have been deleted needle value", v)
- }
- } else if i%2 == 0 {
- if v == nil {
- t.Fatal("key", i, "missing")
- }
- if v.Size != i {
- t.Fatal("key", i, "size", v.Size)
- }
- }
- }
-
-}
diff --git a/weed-fs/src/pkg/storage/compress.go b/weed-fs/src/pkg/storage/compress.go
deleted file mode 100644
index 256789c9c..000000000
--- a/weed-fs/src/pkg/storage/compress.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package storage
-
-import (
- "bytes"
- "compress/flate"
- "compress/gzip"
- "io/ioutil"
- "strings"
-)
-
-/*
-* Default more not to gzip since gzip can be done on client side.
-*/
-func IsGzippable(ext, mtype string) bool {
- if strings.HasPrefix(mtype, "text/") {
- return true
- }
- switch ext {
- case ".zip", ".rar", ".gz", ".bz2", ".xz":
- return false
- case ".pdf", ".txt", ".html", ".css", ".js", ".json":
- return true
- }
- if strings.HasPrefix(mtype, "application/") {
- if strings.HasSuffix(mtype, "xml") {
- return true
- }
- if strings.HasSuffix(mtype, "script") {
- return true
- }
- }
- return false
-}
-
-func GzipData(input []byte) ([]byte, error) {
- buf := new(bytes.Buffer)
- w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
- if _, err := w.Write(input); err != nil {
- println("error compressing data:", err)
- return nil, err
- }
- if err := w.Close(); err != nil {
- println("error closing compressed data:", err)
- return nil, err
- }
- return buf.Bytes(), nil
-}
-func UnGzipData(input []byte) ([]byte, error) {
- buf := bytes.NewBuffer(input)
- r, _ := gzip.NewReader(buf)
- defer r.Close()
- output, err := ioutil.ReadAll(r)
- if err != nil {
- println("error uncompressing data:", err)
- }
- return output, err
-}
diff --git a/weed-fs/src/pkg/storage/crc.go b/weed-fs/src/pkg/storage/crc.go
deleted file mode 100644
index 198352e68..000000000
--- a/weed-fs/src/pkg/storage/crc.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package storage
-
-import (
- "hash/crc32"
-)
-
-var table = crc32.MakeTable(crc32.Castagnoli)
-
-type CRC uint32
-
-func NewCRC(b []byte) CRC {
- return CRC(0).Update(b)
-}
-
-func (c CRC) Update(b []byte) CRC {
- return CRC(crc32.Update(uint32(c), table, b))
-}
-
-func (c CRC) Value() uint32 {
- return uint32(c>>15|c<<17) + 0xa282ead8
-}
diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go
deleted file mode 100644
index 2ab671426..000000000
--- a/weed-fs/src/pkg/storage/needle.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package storage
-
-import (
- "encoding/hex"
- "fmt"
- "io/ioutil"
- "mime"
- "net/http"
- "path"
- "pkg/util"
- "strconv"
- "strings"
-)
-
-const (
- NeedleHeaderSize = 16 //should never change this
- NeedlePaddingSize = 8
- NeedleChecksumSize = 4
-)
-
-type Needle struct {
- Cookie uint32 "random number to mitigate brute force lookups"
- Id uint64 "needle id"
- Size uint32 "sum of DataSize,Data,NameSize,Name,MimeSize,Mime"
-
- DataSize uint32 "Data size" //version2
- Data []byte "The actual file data"
- Flags byte "boolean flags" //version2
- NameSize uint8 //version2
- Name []byte "maximum 256 characters" //version2
- MimeSize uint8 //version2
- Mime []byte "maximum 256 characters" //version2
-
- Checksum CRC "CRC32 to check integrity"
- Padding []byte "Aligned to 8 bytes"
-}
-
-func NewNeedle(r *http.Request) (n *Needle, fname string, e error) {
-
- n = new(Needle)
- form, fe := r.MultipartReader()
- if fe != nil {
- fmt.Println("MultipartReader [ERROR]", fe)
- e = fe
- return
- }
- part, fe := form.NextPart()
- if fe != nil {
- fmt.Println("Reading Multi part [ERROR]", fe)
- e = fe
- return
- }
- fname = part.FileName()
- fname = path.Base(fname)
- data, _ := ioutil.ReadAll(part)
- dotIndex := strings.LastIndex(fname, ".")
- ext, mtype := "", ""
- if dotIndex > 0 {
- ext = fname[dotIndex:]
- mtype = mime.TypeByExtension(ext)
- }
- contentType := part.Header.Get("Content-Type")
- if contentType != "" && mtype != contentType && len(contentType) < 256 {
- n.Mime = []byte(contentType)
- n.SetHasMime()
- mtype = contentType
- }
- if IsGzippable(ext, mtype) {
- if data, e = GzipData(data); e != nil {
- return
- }
- n.SetGzipped()
- }
- if ext == ".gz" {
- n.SetGzipped()
- }
- if len(fname) < 256 {
- if strings.HasSuffix(fname, ".gz") {
- n.Name = []byte(fname[:len(fname)-3])
- } else {
- n.Name = []byte(fname)
- }
- n.SetHasName()
- }
-
- n.Data = data
- n.Checksum = NewCRC(data)
-
- commaSep := strings.LastIndex(r.URL.Path, ",")
- dotSep := strings.LastIndex(r.URL.Path, ".")
- fid := r.URL.Path[commaSep+1:]
- if dotSep > 0 {
- fid = r.URL.Path[commaSep+1 : dotSep]
- }
-
- n.ParsePath(fid)
-
- return
-}
-func (n *Needle) ParsePath(fid string) {
- length := len(fid)
- if length <= 8 {
- if length > 0 {
- println("Invalid fid", fid, "length", length)
- }
- return
- }
- delta := ""
- deltaIndex := strings.LastIndex(fid, "_")
- if deltaIndex > 0 {
- fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:]
- }
- n.Id, n.Cookie = ParseKeyHash(fid)
- if delta != "" {
- d, e := strconv.ParseUint(delta, 10, 64)
- if e == nil {
- n.Id += d
- }
- }
-}
-
-func ParseKeyHash(key_hash_string string) (uint64, uint32) {
- key_hash_bytes, khe := hex.DecodeString(key_hash_string)
- key_hash_len := len(key_hash_bytes)
- if khe != nil || key_hash_len <= 4 {
- println("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe)
- return 0, 0
- }
- key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4])
- hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len])
- return key, hash
-}
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
deleted file mode 100644
index 563e1081b..000000000
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ /dev/null
@@ -1,99 +0,0 @@
-package storage
-
-import (
- //"log"
- "os"
- "pkg/util"
-)
-
-type NeedleMap struct {
- indexFile *os.File
- m CompactMap
-
- //transient
- bytes []byte
-
- deletionCounter int
- fileCounter int
- deletionByteCounter uint64
- fileByteCounter uint64
-}
-
-func NewNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
- m: NewCompactMap(),
- bytes: make([]byte, 16),
- indexFile: file,
- }
- return nm
-}
-
-const (
- RowsToRead = 1024
-)
-
-func LoadNeedleMap(file *os.File) *NeedleMap {
- nm := NewNeedleMap(file)
- bytes := make([]byte, 16*RowsToRead)
- count, e := nm.indexFile.Read(bytes)
- for count > 0 && e == nil {
- for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- nm.fileCounter++
- nm.fileByteCounter = nm.fileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
- }
- } else {
- oldSize := nm.m.Delete(Key(key))
- //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
- }
- }
-
- count, e = nm.indexFile.Read(bytes)
- }
- return nm
-}
-
-func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
- oldSize := nm.m.Set(Key(key), offset, size)
- util.Uint64toBytes(nm.bytes[0:8], key)
- util.Uint32toBytes(nm.bytes[8:12], offset)
- util.Uint32toBytes(nm.bytes[12:16], size)
- nm.fileCounter++
- nm.fileByteCounter = nm.fileByteCounter + uint64(size)
- if oldSize > 0 {
- nm.deletionCounter++
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(oldSize)
- }
- return nm.indexFile.Write(nm.bytes)
-}
-func (nm *NeedleMap) Get(key uint64) (element *NeedleValue, ok bool) {
- element, ok = nm.m.Get(Key(key))
- return
-}
-func (nm *NeedleMap) Delete(key uint64) {
- nm.deletionByteCounter = nm.deletionByteCounter + uint64(nm.m.Delete(Key(key)))
- util.Uint64toBytes(nm.bytes[0:8], key)
- util.Uint32toBytes(nm.bytes[8:12], 0)
- util.Uint32toBytes(nm.bytes[12:16], 0)
- nm.indexFile.Write(nm.bytes)
- nm.deletionCounter++
-}
-func (nm *NeedleMap) Close() {
- nm.indexFile.Close()
-}
-func (nm *NeedleMap) ContentSize() uint64 {
- return nm.fileByteCounter
-}
-func (nm *NeedleMap) Visit(visit func(NeedleValue) error) (err error) {
- return nm.m.Visit(visit)
-}
diff --git a/weed-fs/src/pkg/storage/needle_read_write.go b/weed-fs/src/pkg/storage/needle_read_write.go
deleted file mode 100644
index d74aac6a1..000000000
--- a/weed-fs/src/pkg/storage/needle_read_write.go
+++ /dev/null
@@ -1,238 +0,0 @@
-package storage
-
-import (
- "errors"
- "fmt"
- "io"
- "os"
- "pkg/util"
-)
-
-const (
- FlagGzip = 0x01
- FlagHasName = 0x02
- FlagHasMime = 0x04
-)
-
-func (n *Needle) DiskSize() uint32 {
- padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
- return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize
-}
-func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) {
- if s, ok := w.(io.Seeker); ok {
- if end, e := s.Seek(0, 1); e == nil {
- defer func(s io.Seeker, off int64) {
- if err != nil {
- if _, e = s.Seek(off, 0); e != nil {
- fmt.Printf("Failed to seek back to %d with error: %s\n", w, off, e)
- }
- }
- }(s, end)
- } else {
- err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e)
- return
- }
- }
- switch version {
- case Version1:
- header := make([]byte, NeedleHeaderSize)
- util.Uint32toBytes(header[0:4], n.Cookie)
- util.Uint64toBytes(header[4:12], n.Id)
- n.Size = uint32(len(n.Data))
- size = n.Size
- util.Uint32toBytes(header[12:16], n.Size)
- if _, err = w.Write(header); err != nil {
- return
- }
- if _, err = w.Write(n.Data); err != nil {
- return
- }
- padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
- util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- _, err = w.Write(header[0 : NeedleChecksumSize+padding])
- return
- case Version2:
- header := make([]byte, NeedleHeaderSize)
- util.Uint32toBytes(header[0:4], n.Cookie)
- util.Uint64toBytes(header[4:12], n.Id)
- n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime))
- if n.DataSize > 0 {
- n.Size = 4 + n.DataSize + 1
- if n.HasName() {
- n.Size = n.Size + 1 + uint32(n.NameSize)
- }
- if n.HasMime() {
- n.Size = n.Size + 1 + uint32(n.MimeSize)
- }
- }
- size = n.DataSize
- util.Uint32toBytes(header[12:16], n.Size)
- if _, err = w.Write(header); err != nil {
- return
- }
- if n.DataSize > 0 {
- util.Uint32toBytes(header[0:4], n.DataSize)
- if _, err = w.Write(header[0:4]); err != nil {
- return
- }
- if _, err = w.Write(n.Data); err != nil {
- return
- }
- util.Uint8toBytes(header[0:1], n.Flags)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- }
- if n.HasName() {
- util.Uint8toBytes(header[0:1], n.NameSize)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if _, err = w.Write(n.Name); err != nil {
- return
- }
- }
- if n.HasMime() {
- util.Uint8toBytes(header[0:1], n.MimeSize)
- if _, err = w.Write(header[0:1]); err != nil {
- return
- }
- if _, err = w.Write(n.Mime); err != nil {
- return
- }
- }
- padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize)
- util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- _, err = w.Write(header[0 : NeedleChecksumSize+padding])
- return n.DataSize, err
- }
- return 0, fmt.Errorf("Unsupported Version! (%d)", version)
-}
-
-func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) {
- switch version {
- case Version1:
- bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
- if ret, err = r.Read(bytes); err != nil {
- return
- }
- n.readNeedleHeader(bytes)
- n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size]
- checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize])
- if checksum != NewCRC(n.Data).Value() {
- return 0, errors.New("CRC error! Data On Disk Corrupted!")
- }
- return
- case Version2:
- if size == 0 {
- return 0, nil
- }
- bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize)
- if ret, err = r.Read(bytes); err != nil {
- return
- }
- if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) {
- return 0, errors.New("File Entry Not Found!")
- }
- n.readNeedleHeader(bytes)
- if n.Size != size {
- return 0, fmt.Errorf("File Entry Not Found! Needle %d Memory %d", n.Size, size)
- }
- n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)])
- checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize])
- if checksum != NewCRC(n.Data).Value() {
- return 0, errors.New("CRC error! Data On Disk Corrupted!")
- }
- return
- }
- return 0, fmt.Errorf("Unsupported Version! (%d)", version)
-}
-func (n *Needle) readNeedleHeader(bytes []byte) {
- n.Cookie = util.BytesToUint32(bytes[0:4])
- n.Id = util.BytesToUint64(bytes[4:12])
- n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize])
-}
-func (n *Needle) readNeedleDataVersion2(bytes []byte) {
- index, lenBytes := 0, len(bytes)
- if index < lenBytes {
- n.DataSize = util.BytesToUint32(bytes[index : index+4])
- index = index + 4
- n.Data = bytes[index : index+int(n.DataSize)]
- index = index + int(n.DataSize)
- n.Flags = bytes[index]
- index = index + 1
- }
- if index < lenBytes && n.HasName() {
- n.NameSize = uint8(bytes[index])
- index = index + 1
- n.Name = bytes[index : index+int(n.NameSize)]
- index = index + int(n.NameSize)
- }
- if index < lenBytes && n.HasMime() {
- n.MimeSize = uint8(bytes[index])
- index = index + 1
- n.Mime = bytes[index : index+int(n.MimeSize)]
- }
-}
-
-func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) {
- n = new(Needle)
- if version == Version1 || version == Version2 {
- bytes := make([]byte, NeedleHeaderSize)
- var count int
- count, err = r.Read(bytes)
- if count <= 0 || err != nil {
- return nil, 0, err
- }
- n.readNeedleHeader(bytes)
- padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize)
- bodyLength = n.Size + NeedleChecksumSize + padding
- }
- return
-}
-
-//n should be a needle already read the header
-//the input stream will read until next file entry
-func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) {
- if bodyLength <= 0 {
- return nil
- }
- switch version {
- case Version1:
- bytes := make([]byte, bodyLength)
- if _, err = r.Read(bytes); err != nil {
- return
- }
- n.Data = bytes[:n.Size]
- n.Checksum = NewCRC(n.Data)
- case Version2:
- bytes := make([]byte, bodyLength)
- if _, err = r.Read(bytes); err != nil {
- return
- }
- n.readNeedleDataVersion2(bytes[0:n.Size])
- n.Checksum = NewCRC(n.Data)
- default:
- err = fmt.Errorf("Unsupported Version! (%d)", version)
- }
- return
-}
-
-func (n *Needle) IsGzipped() bool {
- return n.Flags&FlagGzip > 0
-}
-func (n *Needle) SetGzipped() {
- n.Flags = n.Flags | FlagGzip
-}
-func (n *Needle) HasName() bool {
- return n.Flags&FlagHasName > 0
-}
-func (n *Needle) SetHasName() {
- n.Flags = n.Flags | FlagHasName
-}
-func (n *Needle) HasMime() bool {
- return n.Flags&FlagHasMime > 0
-}
-func (n *Needle) SetHasMime() {
- n.Flags = n.Flags | FlagHasMime
-}
diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go
deleted file mode 100644
index 0902d1016..000000000
--- a/weed-fs/src/pkg/storage/replication_type.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package storage
-
-import (
- "errors"
-)
-
-type ReplicationType string
-
-const (
- Copy000 = ReplicationType("000") // single copy
- Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
- Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
- Copy100 = ReplicationType("100") // 2 copies, each on different data center
- Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
- Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
- LengthRelicationType = 6
- CopyNil = ReplicationType(255) // nil value
-)
-
-func NewReplicationTypeFromString(t string) (ReplicationType, error) {
- switch t {
- case "000":
- return Copy000, nil
- case "001":
- return Copy001, nil
- case "010":
- return Copy010, nil
- case "100":
- return Copy100, nil
- case "110":
- return Copy110, nil
- case "200":
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:" + t)
-}
-func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
- switch b {
- case byte(000):
- return Copy000, nil
- case byte(001):
- return Copy001, nil
- case byte(010):
- return Copy010, nil
- case byte(100):
- return Copy100, nil
- case byte(110):
- return Copy110, nil
- case byte(200):
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:" + string(b))
-}
-
-func (r *ReplicationType) String() string {
- switch *r {
- case Copy000:
- return "000"
- case Copy001:
- return "001"
- case Copy010:
- return "010"
- case Copy100:
- return "100"
- case Copy110:
- return "110"
- case Copy200:
- return "200"
- }
- return "000"
-}
-func (r *ReplicationType) Byte() byte {
- switch *r {
- case Copy000:
- return byte(000)
- case Copy001:
- return byte(001)
- case Copy010:
- return byte(010)
- case Copy100:
- return byte(100)
- case Copy110:
- return byte(110)
- case Copy200:
- return byte(200)
- }
- return byte(000)
-}
-
-func (repType ReplicationType) GetReplicationLevelIndex() int {
- switch repType {
- case Copy000:
- return 0
- case Copy001:
- return 1
- case Copy010:
- return 2
- case Copy100:
- return 3
- case Copy110:
- return 4
- case Copy200:
- return 5
- }
- return -1
-}
-func (repType ReplicationType) GetCopyCount() int {
- switch repType {
- case Copy000:
- return 1
- case Copy001:
- return 2
- case Copy010:
- return 2
- case Copy100:
- return 2
- case Copy110:
- return 3
- case Copy200:
- return 3
- }
- return 0
-}
diff --git a/weed-fs/src/pkg/storage/sample.idx b/weed-fs/src/pkg/storage/sample.idx
deleted file mode 100644
index 44918b41d..000000000
--- a/weed-fs/src/pkg/storage/sample.idx
+++ /dev/null
Binary files differ
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
deleted file mode 100644
index 69e37c606..000000000
--- a/weed-fs/src/pkg/storage/store.go
+++ /dev/null
@@ -1,204 +0,0 @@
-package storage
-
-import (
- "encoding/json"
- "errors"
- "io/ioutil"
- "log"
- "net/url"
- "pkg/util"
- "strconv"
- "strings"
-)
-
-type Store struct {
- volumes map[VolumeId]*Volume
- dir string
- Port int
- Ip string
- PublicUrl string
- MaxVolumeCount int
-
- masterNode string
- connected bool
- volumeSizeLimit uint64 //read from the master
-
-}
-
-func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *Store) {
- s = &Store{Port: port, Ip: ip, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
- s.volumes = make(map[VolumeId]*Volume)
- s.loadExistingVolumes()
-
- log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes")
- return
-}
-func (s *Store) AddVolume(volumeListString string, replicationType string) error {
- rt, e := NewReplicationTypeFromString(replicationType)
- if e != nil {
- return e
- }
- for _, range_string := range strings.Split(volumeListString, ",") {
- if strings.Index(range_string, "-") < 0 {
- id_string := range_string
- id, err := NewVolumeId(id_string)
- if err != nil {
- return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
- }
- e = s.addVolume(VolumeId(id), rt)
- } else {
- pair := strings.Split(range_string, "-")
- start, start_err := strconv.ParseUint(pair[0], 10, 64)
- if start_err != nil {
- return errors.New("Volume Start Id" + pair[0] + " is not a valid unsigned integer!")
- }
- end, end_err := strconv.ParseUint(pair[1], 10, 64)
- if end_err != nil {
- return errors.New("Volume End Id" + pair[1] + " is not a valid unsigned integer!")
- }
- for id := start; id <= end; id++ {
- if err := s.addVolume(VolumeId(id), rt); err != nil {
- e = err
- }
- }
- }
- }
- return e
-}
-func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) (err error) {
- if s.volumes[vid] != nil {
- return errors.New("Volume Id " + vid.String() + " already exists!")
- }
- log.Println("In dir", s.dir, "adds volume =", vid, ", replicationType =", replicationType)
- s.volumes[vid], err = NewVolume(s.dir, vid, replicationType)
- return err
-}
-
-func (s *Store) CheckCompactVolume(volumeIdString string, garbageThresholdString string) (error, bool) {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!"), false
- }
- garbageThreshold, e := strconv.ParseFloat(garbageThresholdString, 32)
- if e != nil {
- return errors.New("garbageThreshold " + garbageThresholdString + " is not a valid float number!"), false
- }
- return nil, garbageThreshold < s.volumes[vid].garbageLevel()
-}
-func (s *Store) CompactVolume(volumeIdString string) error {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
- }
- return s.volumes[vid].compact()
-}
-func (s *Store) CommitCompactVolume(volumeIdString string) error {
- vid, err := NewVolumeId(volumeIdString)
- if err != nil {
- return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
- }
- return s.volumes[vid].commitCompact()
-}
-func (s *Store) loadExistingVolumes() {
- if dirs, err := ioutil.ReadDir(s.dir); err == nil {
- for _, dir := range dirs {
- name := dir.Name()
- if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
- base := name[:len(name)-len(".dat")]
- if vid, err := NewVolumeId(base); err == nil {
- if s.volumes[vid] == nil {
- if v, e := NewVolume(s.dir, vid, CopyNil); e == nil {
- s.volumes[vid] = v
- log.Println("In dir", s.dir, "read volume =", vid, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
- }
- }
- }
- }
- }
- }
-}
-func (s *Store) Status() []*VolumeInfo {
- var stats []*VolumeInfo
- for k, v := range s.volumes {
- s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
- VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
- stats = append(stats, s)
- }
- return stats
-}
-
-type JoinResult struct {
- VolumeSizeLimit uint64
-}
-
-func (s *Store) SetMaster(mserver string) {
- s.masterNode = mserver
-}
-func (s *Store) Join() error {
- stats := new([]*VolumeInfo)
- for k, v := range s.volumes {
- s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
- VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
- *stats = append(*stats, s)
- }
- bytes, _ := json.Marshal(stats)
- values := make(url.Values)
- if !s.connected {
- values.Add("init", "true")
- }
- values.Add("port", strconv.Itoa(s.Port))
- values.Add("ip", s.Ip)
- values.Add("publicUrl", s.PublicUrl)
- values.Add("volumes", string(bytes))
- values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
- jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
- if err != nil {
- return err
- }
- var ret JoinResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- s.volumeSizeLimit = ret.VolumeSizeLimit
- s.connected = true
- return nil
-}
-func (s *Store) Close() {
- for _, v := range s.volumes {
- v.Close()
- }
-}
-func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
- if v := s.volumes[i]; v != nil {
- size, err = v.write(n)
- if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
- log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
- s.Join()
- }
- return
- }
- log.Println("volume", i, "not found!")
- return
-}
-func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
- if v := s.volumes[i]; v != nil {
- return v.delete(n)
- }
- return 0, nil
-}
-func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
- if v := s.volumes[i]; v != nil {
- return v.read(n)
- }
- return 0, errors.New("Not Found")
-}
-func (s *Store) GetVolume(i VolumeId) *Volume {
- return s.volumes[i]
-}
-
-func (s *Store) HasVolume(i VolumeId) bool {
- _, ok := s.volumes[i]
- return ok
-}
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
deleted file mode 100644
index 707c6e6f8..000000000
--- a/weed-fs/src/pkg/storage/volume.go
+++ /dev/null
@@ -1,274 +0,0 @@
-package storage
-
-import (
- "errors"
- "fmt"
- "io"
- "os"
- "path"
- "sync"
-)
-
-const (
- SuperBlockSize = 8
-)
-
-type SuperBlock struct {
- Version Version
- ReplicaType ReplicationType
-}
-
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, SuperBlockSize)
- header[0] = byte(s.Version)
- header[1] = s.ReplicaType.Byte()
- return header
-}
-
-type Volume struct {
- Id VolumeId
- dir string
- dataFile *os.File
- nm *NeedleMap
-
- SuperBlock
-
- accessLock sync.Mutex
-}
-
-func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
- v = &Volume{dir: dirname, Id: id}
- v.SuperBlock = SuperBlock{ReplicaType: replicationType}
- e = v.load(true)
- return
-}
-func LoadVolumeOnly(dirname string, id VolumeId) (v *Volume, e error) {
- v = &Volume{dir: dirname, Id: id}
- v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
- e = v.load(false)
- return
-}
-func (v *Volume) load(alsoLoadIndex bool) error {
- var e error
- fileName := path.Join(v.dir, v.Id.String())
- v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
- if e != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
- }
- if v.ReplicaType == CopyNil {
- if e = v.readSuperBlock(); e != nil {
- return e
- }
- } else {
- v.maybeWriteSuperBlock()
- }
- if alsoLoadIndex {
- indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
- if ie != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
- }
- v.nm = LoadNeedleMap(indexFile)
- }
- return nil
-}
-func (v *Volume) Version() Version {
- return v.SuperBlock.Version
-}
-func (v *Volume) Size() int64 {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- stat, e := v.dataFile.Stat()
- if e == nil {
- return stat.Size()
- }
- fmt.Printf("Failed to read file size %s %s\n", v.dataFile.Name(), e.Error())
- return -1
-}
-func (v *Volume) Close() {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- v.nm.Close()
- v.dataFile.Close()
-}
-func (v *Volume) maybeWriteSuperBlock() {
- stat, e := v.dataFile.Stat()
- if e != nil {
- fmt.Printf("failed to stat datafile %s: %s", v.dataFile, e)
- return
- }
- if stat.Size() == 0 {
- v.SuperBlock.Version = CurrentVersion
- v.dataFile.Write(v.SuperBlock.Bytes())
- }
-}
-func (v *Volume) readSuperBlock() (err error) {
- v.dataFile.Seek(0, 0)
- header := make([]byte, SuperBlockSize)
- if _, e := v.dataFile.Read(header); e != nil {
- return fmt.Errorf("cannot read superblock: %s", e)
- }
- v.SuperBlock, err = ParseSuperBlock(header)
- return err
-}
-func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
- superBlock.Version = Version(header[0])
- if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err)
- }
- return
-}
-func (v *Volume) NeedToReplicate() bool {
- return v.ReplicaType.GetCopyCount() > 1
-}
-
-func (v *Volume) write(n *Needle) (size uint32, err error) {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- var offset int64
- if offset, err = v.dataFile.Seek(0, 2); err != nil {
- return
- }
- if size, err = n.Append(v.dataFile, v.Version()); err != nil {
- return
- }
- nv, ok := v.nm.Get(n.Id)
- if !ok || int64(nv.Offset)*NeedlePaddingSize < offset {
- _, err = v.nm.Put(n.Id, uint32(offset/NeedlePaddingSize), n.Size)
- }
- return
-}
-func (v *Volume) delete(n *Needle) (uint32, error) {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok {
- v.nm.Delete(n.Id)
- v.dataFile.Seek(int64(nv.Offset*NeedlePaddingSize), 0)
- _, err := n.Append(v.dataFile, v.Version())
- return nv.Size, err
- }
- return 0, nil
-}
-
-func (v *Volume) read(n *Needle) (int, error) {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- nv, ok := v.nm.Get(n.Id)
- if ok && nv.Offset > 0 {
- v.dataFile.Seek(int64(nv.Offset)*NeedlePaddingSize, 0)
- return n.Read(v.dataFile, nv.Size, v.Version())
- }
- return -1, errors.New("Not Found")
-}
-
-func (v *Volume) garbageLevel() float64 {
- return float64(v.nm.deletionByteCounter) / float64(v.ContentSize())
-}
-
-func (v *Volume) compact() error {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
-
- filePath := path.Join(v.dir, v.Id.String())
- return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx")
-}
-func (v *Volume) commitCompact() error {
- v.accessLock.Lock()
- defer v.accessLock.Unlock()
- v.dataFile.Close()
- var e error
- if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat")); e != nil {
- return e
- }
- if e = os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx")); e != nil {
- return e
- }
- if e = v.load(true); e != nil {
- return e
- }
- return nil
-}
-
-func ScanVolumeFile(dirname string, id VolumeId,
- visitSuperBlock func(SuperBlock) error,
- visitNeedle func(n *Needle, offset uint32) error) (err error) {
- var v *Volume
- if v, err = LoadVolumeOnly(dirname, id); err != nil {
- return
- }
- if err = visitSuperBlock(v.SuperBlock); err != nil {
- return
- }
-
- version := v.Version()
-
- offset := uint32(SuperBlockSize)
- n, rest, e := ReadNeedleHeader(v.dataFile, version)
- if e != nil {
- err = fmt.Errorf("cannot read needle header: %s", e)
- return
- }
- for n != nil {
- if err = n.ReadNeedleBody(v.dataFile, version, rest); err != nil {
- err = fmt.Errorf("cannot read needle body: %s", err)
- return
- }
- if err = visitNeedle(n, offset); err != nil {
- return
- }
- offset += NeedleHeaderSize + rest
- if n, rest, err = ReadNeedleHeader(v.dataFile, version); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read needle header: %s", err)
- }
- }
-
- return
-}
-
-func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err error) {
- var (
- dst, idx *os.File
- )
- if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
- return
- }
- defer dst.Close()
-
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644); err != nil {
- return
- }
- defer idx.Close()
-
- nm := NewNeedleMap(idx)
- new_offset := uint32(SuperBlockSize)
-
- err = ScanVolumeFile(v.dir, v.Id, func(superBlock SuperBlock) error {
- _, err = dst.Write(superBlock.Bytes())
- return err
- }, func(n *Needle, offset uint32) error {
- nv, ok := v.nm.Get(n.Id)
- //log.Println("file size is", n.Size, "rest", rest)
- if ok && nv.Offset*NeedlePaddingSize == offset {
- if nv.Size > 0 {
- if _, err = nm.Put(n.Id, new_offset/NeedlePaddingSize, n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
- }
- if _, err = n.Append(dst, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
- }
- new_offset += n.DiskSize()
- //log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
- }
- }
- return nil
- })
-
- return
-}
-func (v *Volume) ContentSize() uint64 {
- return v.nm.fileByteCounter
-}
diff --git a/weed-fs/src/pkg/storage/volume_id.go b/weed-fs/src/pkg/storage/volume_id.go
deleted file mode 100644
index 0333c6cf0..000000000
--- a/weed-fs/src/pkg/storage/volume_id.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package storage
-
-import (
- "strconv"
-)
-
-type VolumeId uint32
-
-func NewVolumeId(vid string) (VolumeId, error) {
- volumeId, err := strconv.ParseUint(vid, 10, 64)
- return VolumeId(volumeId), err
-}
-func (vid *VolumeId) String() string {
- return strconv.FormatUint(uint64(*vid), 10)
-}
-func (vid *VolumeId) Next() VolumeId {
- return VolumeId(uint32(*vid) + 1)
-}
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
deleted file mode 100644
index e4c5f6ec4..000000000
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package storage
-
-import ()
-
-type VolumeInfo struct {
- Id VolumeId
- Size uint64
- RepType ReplicationType
- Version Version
- FileCount int
- DeleteCount int
- DeletedByteCount uint64
-}
diff --git a/weed-fs/src/pkg/storage/volume_version.go b/weed-fs/src/pkg/storage/volume_version.go
deleted file mode 100644
index 9702ae904..000000000
--- a/weed-fs/src/pkg/storage/volume_version.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package storage
-
-import ()
-
-type Version uint8
-
-const (
- Version1 = Version(1)
- Version2 = Version(2)
- CurrentVersion = Version2
-)
diff --git a/weed-fs/src/pkg/topology/configuration.go b/weed-fs/src/pkg/topology/configuration.go
deleted file mode 100644
index 4c8424214..000000000
--- a/weed-fs/src/pkg/topology/configuration.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package topology
-
-import (
- "encoding/xml"
-)
-
-type loc struct {
- dcName string
- rackName string
-}
-type rack struct {
- Name string `xml:"name,attr"`
- Ips []string `xml:"Ip"`
-}
-type dataCenter struct {
- Name string `xml:"name,attr"`
- Racks []rack `xml:"Rack"`
-}
-type topology struct {
- DataCenters []dataCenter `xml:"DataCenter"`
-}
-type Configuration struct {
- XMLName xml.Name `xml:"Configuration"`
- Topo topology `xml:"Topology"`
- ip2location map[string]loc
-}
-
-func NewConfiguration(b []byte) (*Configuration, error) {
- c := &Configuration{}
- err := xml.Unmarshal(b, c)
- c.ip2location = make(map[string]loc)
- for _, dc := range c.Topo.DataCenters {
- for _, rack := range dc.Racks {
- for _, ip := range rack.Ips {
- c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
- }
- }
- }
- return c, err
-}
-
-func (c *Configuration) String() string {
- if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
- return string(b)
- }
- return ""
-}
-
-func (c *Configuration) Locate(ip string) (dc string, rack string) {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
- return "DefaultDataCenter", "DefaultRack"
-}
diff --git a/weed-fs/src/pkg/topology/configuration_test.go b/weed-fs/src/pkg/topology/configuration_test.go
deleted file mode 100644
index 35d82c058..000000000
--- a/weed-fs/src/pkg/topology/configuration_test.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package topology
-
-import (
- "fmt"
- "testing"
-)
-
-func TestLoadConfiguration(t *testing.T) {
-
- confContent := `
-
-<?xml version="1.0" encoding="UTF-8" ?>
-<Configuration>
- <Topology>
- <DataCenter name="dc1">
- <Rack name="rack1">
- <Ip>192.168.1.1</Ip>
- </Rack>
- </DataCenter>
- <DataCenter name="dc2">
- <Rack name="rack1">
- <Ip>192.168.1.2</Ip>
- </Rack>
- <Rack name="rack2">
- <Ip>192.168.1.3</Ip>
- <Ip>192.168.1.4</Ip>
- </Rack>
- </DataCenter>
- </Topology>
-</Configuration>
-`
- c, err := NewConfiguration([]byte(confContent))
-
- fmt.Printf("%s\n", c)
- if err != nil {
- t.Fatalf("unmarshal error:%s", err.Error())
- }
-
- if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
- t.Fatalf("unmarshal error:%s", c)
- }
-}
diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go
deleted file mode 100644
index a3b2b7d13..000000000
--- a/weed-fs/src/pkg/topology/data_center.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package topology
-
-import ()
-
-type DataCenter struct {
- NodeImpl
-}
-
-func NewDataCenter(id string) *DataCenter {
- dc := &DataCenter{}
- dc.id = NodeId(id)
- dc.nodeType = "DataCenter"
- dc.children = make(map[NodeId]Node)
- dc.NodeImpl.value = dc
- return dc
-}
-
-func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- if string(rack.Id()) == rackName {
- return rack
- }
- }
- rack := NewRack(rackName)
- dc.LinkChildNode(rack)
- return rack
-}
-
-func (dc *DataCenter) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = dc.GetMaxVolumeCount()
- m["Free"] = dc.FreeSpace()
- var racks []interface{}
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- racks = append(racks, rack.ToMap())
- }
- m["Racks"] = racks
- return m
-}
diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go
deleted file mode 100644
index 01f8f768a..000000000
--- a/weed-fs/src/pkg/topology/data_node.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package topology
-
-import (
- _ "fmt"
- "pkg/storage"
- "strconv"
-)
-
-type DataNode struct {
- NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
- Dead bool
-}
-
-func NewDataNode(id string) *DataNode {
- s := &DataNode{}
- s.id = NodeId(id)
- s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
- s.NodeImpl.value = s
- return s
-}
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
- if _, ok := dn.volumes[v.Id]; !ok {
- dn.volumes[v.Id] = v
- dn.UpAdjustVolumeCountDelta(1)
- dn.UpAdjustActiveVolumeCountDelta(1)
- dn.UpAdjustMaxVolumeId(v.Id)
- } else {
- dn.volumes[v.Id] = v
- }
-}
-func (dn *DataNode) GetTopology() *Topology {
- p := dn.parent
- for p.Parent() != nil {
- p = p.Parent()
- }
- t := p.(*Topology)
- return t
-}
-func (dn *DataNode) MatchLocation(ip string, port int) bool {
- return dn.Ip == ip && dn.Port == port
-}
-func (dn *DataNode) Url() string {
- return dn.Ip + ":" + strconv.Itoa(dn.Port)
-}
-
-func (dn *DataNode) ToMap() interface{} {
- ret := make(map[string]interface{})
- ret["Url"] = dn.Url()
- ret["Volumes"] = dn.GetVolumeCount()
- ret["Max"] = dn.GetMaxVolumeCount()
- ret["Free"] = dn.FreeSpace()
- ret["PublicUrl"] = dn.PublicUrl
- return ret
-}
diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go
deleted file mode 100644
index 02b427a2b..000000000
--- a/weed-fs/src/pkg/topology/node.go
+++ /dev/null
@@ -1,200 +0,0 @@
-package topology
-
-import (
- "fmt"
- "pkg/storage"
-)
-
-type NodeId string
-type Node interface {
- Id() NodeId
- String() string
- FreeSpace() int
- ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
- UpAdjustVolumeCountDelta(volumeCountDelta int)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
- UpAdjustMaxVolumeId(vid storage.VolumeId)
-
- GetVolumeCount() int
- GetActiveVolumeCount() int
- GetMaxVolumeCount() int
- GetMaxVolumeId() storage.VolumeId
- SetParent(Node)
- LinkChildNode(node Node)
- UnlinkChildNode(nodeId NodeId)
- CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
-
- IsDataNode() bool
- Children() map[NodeId]Node
- Parent() Node
-
- GetValue() interface{} //get reference to the topology,dc,rack,datanode
-}
-type NodeImpl struct {
- id NodeId
- volumeCount int
- activeVolumeCount int
- maxVolumeCount int
- parent Node
- children map[NodeId]Node
- maxVolumeId storage.VolumeId
-
- //for rack, data center, topology
- nodeType string
- value interface{}
-}
-
-func (n *NodeImpl) IsDataNode() bool {
- return n.nodeType == "DataNode"
-}
-func (n *NodeImpl) IsRack() bool {
- return n.nodeType == "Rack"
-}
-func (n *NodeImpl) IsDataCenter() bool {
- return n.nodeType == "DataCenter"
-}
-func (n *NodeImpl) String() string {
- if n.parent != nil {
- return n.parent.String() + ":" + string(n.id)
- }
- return string(n.id)
-}
-func (n *NodeImpl) Id() NodeId {
- return n.id
-}
-func (n *NodeImpl) FreeSpace() int {
- return n.maxVolumeCount - n.volumeCount
-}
-func (n *NodeImpl) SetParent(node Node) {
- n.parent = node
-}
-func (n *NodeImpl) Children() map[NodeId]Node {
- return n.children
-}
-func (n *NodeImpl) Parent() Node {
- return n.parent
-}
-func (n *NodeImpl) GetValue() interface{} {
- return n.value
-}
-func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
- ret := false
- var assignedNode *DataNode
- for _, node := range n.children {
- freeSpace := node.FreeSpace()
- //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
- if freeSpace <= 0 {
- continue
- }
- if r >= freeSpace {
- r -= freeSpace
- } else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
- //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
- }
- ret, assignedNode = node.ReserveOneVolume(r, vid)
- if ret {
- break
- }
- }
- }
- return ret, assignedNode
-}
-
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
- n.maxVolumeCount += maxVolumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
- n.volumeCount += volumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
- n.activeVolumeCount += activeVolumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
- if n.maxVolumeId < vid {
- n.maxVolumeId = vid
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeId(vid)
- }
- }
-}
-func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
- return n.maxVolumeId
-}
-func (n *NodeImpl) GetVolumeCount() int {
- return n.volumeCount
-}
-func (n *NodeImpl) GetActiveVolumeCount() int {
- return n.activeVolumeCount
-}
-func (n *NodeImpl) GetMaxVolumeCount() int {
- return n.maxVolumeCount
-}
-
-func (n *NodeImpl) LinkChildNode(node Node) {
- if n.children[node.Id()] == nil {
- n.children[node.Id()] = node
- n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
- n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
- n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
- n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
- node.SetParent(n)
- fmt.Println(n, "adds child", node.Id())
- }
-}
-
-func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
- node := n.children[nodeId]
- node.SetParent(nil)
- if node != nil {
- delete(n.children, node.Id())
- n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
- n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
- n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
- fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
- }
-}
-
-func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
- if n.IsRack() {
- for _, c := range n.Children() {
- dn := c.(*DataNode) //can not cast n to DataNode
- if dn.LastSeen < freshThreshHold {
- if !dn.Dead {
- dn.Dead = true
- n.GetTopology().chanDeadDataNodes <- dn
- }
- }
- for _, v := range dn.volumes {
- if uint64(v.Size) >= volumeSizeLimit {
- //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
- n.GetTopology().chanFullVolumes <- v
- }
- }
- }
- } else {
- for _, c := range n.Children() {
- c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
- }
- }
-}
-
-func (n *NodeImpl) GetTopology() *Topology {
- var p Node
- p = n
- for p.Parent() != nil {
- p = p.Parent()
- }
- return p.GetValue().(*Topology)
-}
diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go
deleted file mode 100644
index 3115e0213..000000000
--- a/weed-fs/src/pkg/topology/node_list.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package topology
-
-import (
- "fmt"
- "math/rand"
- "pkg/storage"
-)
-
-type NodeList struct {
- nodes map[NodeId]Node
- except map[string]Node
-}
-
-func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList {
- m := make(map[NodeId]Node, len(nodes)-len(except))
- for _, n := range nodes {
- if except[n.String()] == nil {
- m[n.Id()] = n
- }
- }
- nl := &NodeList{nodes: m}
- return nl
-}
-
-func (nl *NodeList) FreeSpace() int {
- freeSpace := 0
- for _, n := range nl.nodes {
- freeSpace += n.FreeSpace()
- }
- return freeSpace
-}
-
-func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
- var list []Node
- for _, n := range nl.nodes {
- if n.FreeSpace() >= min {
- list = append(list, n)
- }
- }
- if n > len(list) {
- return nil, false
- }
- for i := n; i > 0; i-- {
- r := rand.Intn(i)
- t := list[r]
- list[r] = list[i-1]
- list[i-1] = t
- }
- return list[len(list)-n:], true
-}
-
-func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
- for _, node := range nl.nodes {
- freeSpace := node.FreeSpace()
- if randomVolumeIndex >= freeSpace {
- randomVolumeIndex -= freeSpace
- } else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
- fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
- }
- children := node.Children()
- newNodeList := NewNodeList(children, nl.except)
- return newNodeList.ReserveOneVolume(randomVolumeIndex, vid)
- }
- }
- return false, nil
-
-}
diff --git a/weed-fs/src/pkg/topology/node_list_test.go b/weed-fs/src/pkg/topology/node_list_test.go
deleted file mode 100644
index 2fb4fa970..000000000
--- a/weed-fs/src/pkg/topology/node_list_test.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package topology
-
-import (
- _ "fmt"
- "strconv"
- "testing"
-)
-
-func TestXYZ(t *testing.T) {
- topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5)
- for i := 0; i < 5; i++ {
- dc := NewDataCenter("dc" + strconv.Itoa(i))
- dc.activeVolumeCount = i
- dc.maxVolumeCount = 5
- topo.LinkChildNode(dc)
- }
- nl := NewNodeList(topo.Children(), nil)
-
- picked, ret := nl.RandomlyPickN(1)
- if !ret || len(picked) != 1 {
- t.Errorf("need to randomly pick 1 node")
- }
-
- picked, ret = nl.RandomlyPickN(4)
- if !ret || len(picked) != 4 {
- t.Errorf("need to randomly pick 4 nodes")
- }
-
- picked, ret = nl.RandomlyPickN(5)
- if !ret || len(picked) != 5 {
- t.Errorf("need to randomly pick 5 nodes")
- }
-
- picked, ret = nl.RandomlyPickN(6)
- if ret || len(picked) != 0 {
- t.Errorf("can not randomly pick 6 nodes:", ret, picked)
- }
-
-}
diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go
deleted file mode 100644
index acc34417a..000000000
--- a/weed-fs/src/pkg/topology/rack.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package topology
-
-import (
- "strconv"
- "time"
-)
-
-type Rack struct {
- NodeImpl
-}
-
-func NewRack(id string) *Rack {
- r := &Rack{}
- r.id = NodeId(id)
- r.nodeType = "Rack"
- r.children = make(map[NodeId]Node)
- r.NodeImpl.value = r
- return r
-}
-
-func (r *Rack) FindDataNode(ip string, port int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- return dn
- }
- }
- return nil
-}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- dn.LastSeen = time.Now().Unix()
- if dn.Dead {
- dn.Dead = false
- r.GetTopology().chanRecoveredDataNodes <- dn
- dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
- }
- return dn
- }
- }
- dn := NewDataNode(ip + ":" + strconv.Itoa(port))
- dn.Ip = ip
- dn.Port = port
- dn.PublicUrl = publicUrl
- dn.maxVolumeCount = maxVolumeCount
- dn.LastSeen = time.Now().Unix()
- r.LinkChildNode(dn)
- return dn
-}
-
-func (rack *Rack) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = rack.GetMaxVolumeCount()
- m["Free"] = rack.FreeSpace()
- var dns []interface{}
- for _, c := range rack.Children() {
- dn := c.(*DataNode)
- dns = append(dns, dn.ToMap())
- }
- m["DataNodes"] = dns
- return m
-}
diff --git a/weed-fs/src/pkg/topology/topo_test.go b/weed-fs/src/pkg/topology/topo_test.go
deleted file mode 100644
index 71a901c8e..000000000
--- a/weed-fs/src/pkg/topology/topo_test.go
+++ /dev/null
@@ -1,127 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "fmt"
- "math/rand"
- "pkg/storage"
- "testing"
- "time"
-)
-
-var topologyLayout = `
-{
- "dc1":{
- "rack1":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server2":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server1":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server2":{
- "volumes":[],
- "limit":4
- },
- "server3":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":2
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server1":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
-}
-`
-
-func setup(topologyLayout string) *Topology {
- var data interface{}
- err := json.Unmarshal([]byte(topologyLayout), &data)
- if err != nil {
- fmt.Println("error:", err)
- }
-
- //need to connect all nodes first before server adding volumes
- topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5)
- mTopology := data.(map[string]interface{})
- for dcKey, dcValue := range mTopology {
- dc := NewDataCenter(dcKey)
- dcMap := dcValue.(map[string]interface{})
- topo.LinkChildNode(dc)
- for rackKey, rackValue := range dcMap {
- rack := NewRack(rackKey)
- rackMap := rackValue.(map[string]interface{})
- dc.LinkChildNode(rack)
- for serverKey, serverValue := range rackMap {
- server := NewDataNode(serverKey)
- serverMap := serverValue.(map[string]interface{})
- rack.LinkChildNode(server)
- for _, v := range serverMap["volumes"].([]interface{}) {
- m := v.(map[string]interface{})
- vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- }
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
- }
- }
- }
-
- return topo
-}
-
-func TestRemoveDataCenter(t *testing.T) {
- topo := setup(topologyLayout)
- topo.UnlinkChildNode(NodeId("dc2"))
- if topo.GetActiveVolumeCount() != 15 {
- t.Fail()
- }
- topo.UnlinkChildNode(NodeId("dc3"))
- if topo.GetActiveVolumeCount() != 12 {
- t.Fail()
- }
-}
-
-func TestReserveOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- rand.Seed(1)
- ret, node, vid := topo.RandomlyReserveOneVolume()
- fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
-
-}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
deleted file mode 100644
index 4fd109a99..000000000
--- a/weed-fs/src/pkg/topology/topology.go
+++ /dev/null
@@ -1,148 +0,0 @@
-package topology
-
-import (
- "errors"
- "io/ioutil"
- "math/rand"
- "pkg/directory"
- "pkg/sequence"
- "pkg/storage"
-)
-
-type Topology struct {
- NodeImpl
-
- //transient vid~servers mapping for each replication type
- replicaType2VolumeLayout []*VolumeLayout
-
- pulse int64
-
- volumeSizeLimit uint64
-
- sequence sequence.Sequencer
-
- chanDeadDataNodes chan *DataNode
- chanRecoveredDataNodes chan *DataNode
- chanFullVolumes chan storage.VolumeInfo
-
- configuration *Configuration
-}
-
-func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
- t := &Topology{}
- t.id = NodeId(id)
- t.nodeType = "Topology"
- t.NodeImpl.value = t
- t.children = make(map[NodeId]Node)
- t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
- t.pulse = int64(pulse)
- t.volumeSizeLimit = volumeSizeLimit
-
- t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
-
- t.chanDeadDataNodes = make(chan *DataNode)
- t.chanRecoveredDataNodes = make(chan *DataNode)
- t.chanFullVolumes = make(chan storage.VolumeInfo)
-
- t.loadConfiguration(confFile)
-
- return t
-}
-
-func (t *Topology) loadConfiguration(configurationFile string) error {
- b, e := ioutil.ReadFile(configurationFile)
- if e == nil {
- t.configuration, e = NewConfiguration(b)
- }
- return e
-}
-
-func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- if list := vl.Lookup(vid); list != nil {
- return list
- }
- }
- }
- return nil
-}
-
-func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
- if t.FreeSpace() <= 0 {
- return false, nil, nil
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
- return ret, node, &vid
-}
-
-func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
- freeSpace := t.FreeSpace()
- for _, node := range except {
- freeSpace -= node.FreeSpace()
- }
- if freeSpace <= 0 {
- return false, nil, nil
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
- return ret, node, &vid
-}
-
-func (t *Topology) NextVolumeId() storage.VolumeId {
- vid := t.GetMaxVolumeId()
- return vid.Next()
-}
-
-func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
- }
- vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
- if err != nil {
- return "", 0, nil, errors.New("No writable volumes avalable!")
- }
- fileId, count := t.sequence.NextFileId(count)
- return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
-}
-
-func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
- }
- return t.replicaType2VolumeLayout[replicationTypeIndex]
-}
-
-func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
-}
-
-func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
- dcName, rackName := t.configuration.Locate(ip)
- dc := t.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn := rack.FindDataNode(ip, port)
- if init && dn != nil {
- t.UnRegisterDataNode(dn)
- }
- dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
- for _, v := range volumeInfos {
- dn.AddOrUpdateVolume(v)
- t.RegisterVolumeLayout(&v, dn)
- }
-}
-
-func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- if string(dc.Id()) == dcName {
- return dc
- }
- }
- dc := NewDataCenter(dcName)
- t.LinkChildNode(dc)
- return dc
-}
diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go
deleted file mode 100644
index 93c8e5511..000000000
--- a/weed-fs/src/pkg/topology/topology_compact.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
- "pkg/storage"
- "pkg/util"
- "time"
-)
-
-func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url())
- if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
- //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
- ch <- ret
- }
- }(index, dn.Url(), vid)
- }
- isCheckSuccess := true
- for _ = range locationlist.list {
- select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
- case <-time.After(30 * time.Minute):
- isCheckSuccess = false
- break
- }
- }
- return isCheckSuccess
-}
-func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- vl.removeFromWritable(vid)
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- fmt.Println(index, "Start vacuuming", vid, "on", dn.Url())
- if e := vacuumVolume_Compact(url, vid); e != nil {
- fmt.Println(index, "Error when vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- fmt.Println(index, "Complete vacuuming", vid, "on", url)
- ch <- true
- }
- }(index, dn.Url(), vid)
- }
- isVacuumSuccess := true
- for _ = range locationlist.list {
- select {
- case _ = <-ch:
- case <-time.After(30 * time.Minute):
- isVacuumSuccess = false
- break
- }
- }
- return isVacuumSuccess
-}
-func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- isCommitSuccess := true
- for _, dn := range locationlist.list {
- fmt.Println("Start Commiting vacuum", vid, "on", dn.Url())
- if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
- fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e)
- isCommitSuccess = false
- } else {
- fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url())
- }
- }
- if isCommitSuccess {
- vl.setVolumeWritable(vid)
- }
- return isCommitSuccess
-}
-func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- for vid, locationlist := range vl.vid2location {
- if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(vl, vid, locationlist) {
- batchVacuumVolumeCommit(vl, vid, locationlist)
- }
- }
- }
- }
- }
- return 0
-}
-
-type VacuumVolumeResult struct {
- Result bool
- Error string
-}
-
-func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("garbageThreshold", garbageThreshold)
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
- if err != nil {
- fmt.Println("parameters:", values)
- return err, false
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err, false
- }
- if ret.Error != "" {
- return errors.New(ret.Error), false
- }
- return nil, ret.Result
-}
-func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
-func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go
deleted file mode 100644
index debedc3d3..000000000
--- a/weed-fs/src/pkg/topology/topology_event_handling.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package topology
-
-import (
- "fmt"
- "math/rand"
- "pkg/storage"
- "time"
-)
-
-func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
- go func() {
- for {
- freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
- t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
- time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
- }
- }()
- go func(garbageThreshold string) {
- c := time.Tick(15 * time.Minute)
- for _ = range c {
- t.Vacuum(garbageThreshold)
- }
- }(garbageThreshold)
- go func() {
- for {
- select {
- case v := <-t.chanFullVolumes:
- t.SetVolumeCapacityFull(v)
- case dn := <-t.chanRecoveredDataNodes:
- t.RegisterRecoveredDataNode(dn)
- fmt.Println("DataNode", dn, "is back alive!")
- case dn := <-t.chanDeadDataNodes:
- t.UnRegisterDataNode(dn)
- fmt.Println("DataNode", dn, "is dead!")
- }
- }
- }()
-}
-func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.RepType)
- if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
- return false
- }
- for _, dn := range vl.vid2location[volumeInfo.Id].list {
- dn.UpAdjustActiveVolumeCountDelta(-1)
- }
- return true
-}
-func (t *Topology) UnRegisterDataNode(dn *DataNode) {
- for _, v := range dn.volumes {
- fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.RepType)
- vl.SetVolumeUnavailable(dn, v.Id)
- }
- dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
- dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
- dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
- dn.Parent().UnlinkChildNode(dn.Id())
-}
-func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
- for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.RepType)
- if vl.isWritable(&v) {
- vl.SetVolumeAvailable(dn, v.Id)
- }
- }
-}
diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go
deleted file mode 100644
index b416ee943..000000000
--- a/weed-fs/src/pkg/topology/topology_map.go
+++ /dev/null
@@ -1,50 +0,0 @@
-package topology
-
-import ()
-
-func (t *Topology) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- var dcs []interface{}
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- dcs = append(dcs, dc.ToMap())
- }
- m["DataCenters"] = dcs
- var layouts []interface{}
- for _, layout := range t.replicaType2VolumeLayout {
- if layout != nil {
- layouts = append(layouts, layout.ToMap())
- }
- }
- m["layouts"] = layouts
- return m
-}
-
-func (t *Topology) ToVolumeMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- dcs := make(map[NodeId]interface{})
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- racks := make(map[NodeId]interface{})
- for _, r := range dc.Children() {
- rack := r.(*Rack)
- dataNodes := make(map[NodeId]interface{})
- for _, d := range rack.Children() {
- dn := d.(*DataNode)
- var volumes []interface{}
- for _, v := range dn.volumes {
- volumes = append(volumes, v)
- }
- dataNodes[d.Id()] = volumes
- }
- racks[r.Id()] = dataNodes
- }
- dcs[dc.Id()] = racks
- }
- m["DataCenters"] = dcs
- return m
-}
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
deleted file mode 100644
index 6d62128d9..000000000
--- a/weed-fs/src/pkg/topology/volume_layout.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package topology
-
-import (
- "errors"
- "fmt"
- "math/rand"
- "pkg/storage"
-)
-
-type VolumeLayout struct {
- repType storage.ReplicationType
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- pulse int64
- volumeSizeLimit uint64
-}
-
-func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
- return &VolumeLayout{
- repType: repType,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- pulse: pulse,
- volumeSizeLimit: volumeSizeLimit,
- }
-}
-
-func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
- if _, ok := vl.vid2location[v.Id]; !ok {
- vl.vid2location[v.Id] = NewVolumeLocationList()
- }
- if vl.vid2location[v.Id].Add(dn) {
- if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
- if vl.isWritable(v) {
- vl.writables = append(vl.writables, v.Id)
- }
- }
- }
-}
-
-func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
- return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion
-}
-
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
- return vl.vid2location[vid].list
-}
-
-func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
- len_writers := len(vl.writables)
- if len_writers <= 0 {
- fmt.Println("No more writable volumes!")
- return nil, 0, nil, errors.New("No more writable volumes!")
- }
- vid := vl.writables[rand.Intn(len_writers)]
- locationList := vl.vid2location[vid]
- if locationList != nil {
- return &vid, count, locationList, nil
- }
- return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
-}
-
-func (vl *VolumeLayout) GetActiveVolumeCount() int {
- return len(vl.writables)
-}
-
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
- for i, v := range vl.writables {
- if v == vid {
- fmt.Println("Volume", vid, "becomes unwritable")
- vl.writables = append(vl.writables[:i], vl.writables[i+1:]...)
- return true
- }
- }
- return false
-}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
- for _, v := range vl.writables {
- if v == vid {
- return false
- }
- }
- fmt.Println("Volume", vid, "becomes writable")
- vl.writables = append(vl.writables, vid)
- return true
-}
-
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
- if vl.vid2location[vid].Remove(dn) {
- if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
- fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
- return vl.removeFromWritable(vid)
- }
- }
- return false
-}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
- if vl.vid2location[vid].Add(dn) {
- if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
- return vl.setVolumeWritable(vid)
- }
- }
- return false
-}
-
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
- return vl.removeFromWritable(vid)
-}
-
-func (vl *VolumeLayout) ToMap() interface{} {
- m := make(map[string]interface{})
- m["replication"] = vl.repType.String()
- m["writables"] = vl.writables
- //m["locations"] = vl.vid2location
- return m
-}
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
deleted file mode 100644
index 507a240b5..000000000
--- a/weed-fs/src/pkg/topology/volume_location.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package topology
-
-import ()
-
-type VolumeLocationList struct {
- list []*DataNode
-}
-
-func NewVolumeLocationList() *VolumeLocationList {
- return &VolumeLocationList{}
-}
-
-func (dnll *VolumeLocationList) Head() *DataNode {
- return dnll.list[0]
-}
-
-func (dnll *VolumeLocationList) Length() int {
- return len(dnll.list)
-}
-
-func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
- for _, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- return false
- }
- }
- dnll.list = append(dnll.list, loc)
- return true
-}
-
-func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
- for i, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
- return true
- }
- }
- return false
-}
-
-func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
- var changed bool
- for _, dnl := range dnll.list {
- if dnl.LastSeen < freshThreshHold {
- changed = true
- break
- }
- }
- if changed {
- var l []*DataNode
- for _, dnl := range dnll.list {
- if dnl.LastSeen >= freshThreshHold {
- l = append(l, dnl)
- }
- }
- dnll.list = l
- }
-}
diff --git a/weed-fs/src/pkg/util/bytes.go b/weed-fs/src/pkg/util/bytes.go
deleted file mode 100644
index 6cc3d7018..000000000
--- a/weed-fs/src/pkg/util/bytes.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package util
-
-func BytesToUint64(b []byte) (v uint64) {
- length := uint(len(b))
- for i := uint(0); i < length-1; i++ {
- v += uint64(b[i])
- v <<= 8
- }
- v += uint64(b[length-1])
- return
-}
-func BytesToUint32(b []byte) (v uint32) {
- length := uint(len(b))
- for i := uint(0); i < length-1; i++ {
- v += uint32(b[i])
- v <<= 8
- }
- v += uint32(b[length-1])
- return
-}
-func Uint64toBytes(b []byte, v uint64) {
- for i := uint(0); i < 8; i++ {
- b[7-i] = byte(v >> (i * 8))
- }
-}
-func Uint32toBytes(b []byte, v uint32) {
- for i := uint(0); i < 4; i++ {
- b[3-i] = byte(v >> (i * 8))
- }
-}
-func Uint8toBytes(b []byte, v uint8) {
- b[0] = byte(v)
-}
diff --git a/weed-fs/src/pkg/util/config.go b/weed-fs/src/pkg/util/config.go
deleted file mode 100644
index 6ac8a3a65..000000000
--- a/weed-fs/src/pkg/util/config.go
+++ /dev/null
@@ -1,128 +0,0 @@
-// Copyright 2011 Numerotron Inc.
-// Use of this source code is governed by an MIT-style license
-// that can be found in the LICENSE file.
-//
-// Developed at www.stathat.com by Patrick Crosby
-// Contact us on twitter with any questions: twitter.com/stat_hat
-
-// The jconfig package provides a simple, basic configuration file parser using JSON.
-package util
-
-import (
- "bytes"
- "encoding/json"
- "log"
- "os"
-)
-
-type Config struct {
- data map[string]interface{}
- filename string
-}
-
-func newConfig() *Config {
- result := new(Config)
- result.data = make(map[string]interface{})
- return result
-}
-
-// Loads config information from a JSON file
-func LoadConfig(filename string) *Config {
- result := newConfig()
- result.filename = filename
- err := result.parse()
- if err != nil {
- log.Fatalf("error loading config file %s: %s", filename, err)
- }
- return result
-}
-
-// Loads config information from a JSON string
-func LoadConfigString(s string) *Config {
- result := newConfig()
- err := json.Unmarshal([]byte(s), &result.data)
- if err != nil {
- log.Fatalf("error parsing config string %s: %s", s, err)
- }
- return result
-}
-
-func (c *Config) StringMerge(s string) {
- next := LoadConfigString(s)
- c.merge(next.data)
-}
-
-func (c *Config) LoadMerge(filename string) {
- next := LoadConfig(filename)
- c.merge(next.data)
-}
-
-func (c *Config) merge(ndata map[string]interface{}) {
- for k, v := range ndata {
- c.data[k] = v
- }
-}
-
-func (c *Config) parse() error {
- f, err := os.Open(c.filename)
- if err != nil {
- return err
- }
- defer f.Close()
- b := new(bytes.Buffer)
- _, err = b.ReadFrom(f)
- if err != nil {
- return err
- }
- err = json.Unmarshal(b.Bytes(), &c.data)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-// Returns a string for the config variable key
-func (c *Config) GetString(key string) string {
- result, present := c.data[key]
- if !present {
- return ""
- }
- return result.(string)
-}
-
-// Returns an int for the config variable key
-func (c *Config) GetInt(key string) int {
- x, ok := c.data[key]
- if !ok {
- return -1
- }
- return int(x.(float64))
-}
-
-// Returns a float for the config variable key
-func (c *Config) GetFloat(key string) float64 {
- x, ok := c.data[key]
- if !ok {
- return -1
- }
- return x.(float64)
-}
-
-// Returns a bool for the config variable key
-func (c *Config) GetBool(key string) bool {
- x, ok := c.data[key]
- if !ok {
- return false
- }
- return x.(bool)
-}
-
-// Returns an array for the config variable key
-func (c *Config) GetArray(key string) []interface{} {
- result, present := c.data[key]
- if !present {
- return []interface{}(nil)
- }
- return result.([]interface{})
-}
diff --git a/weed-fs/src/pkg/util/parse.go b/weed-fs/src/pkg/util/parse.go
deleted file mode 100644
index 930da9522..000000000
--- a/weed-fs/src/pkg/util/parse.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package util
-
-import (
- "strconv"
-)
-
-func ParseInt(text string, defaultValue int) int {
- count, parseError := strconv.ParseUint(text, 10, 64)
- if parseError != nil {
- if len(text) > 0 {
- return 0
- }
- return defaultValue
- }
- return int(count)
-}
diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go
deleted file mode 100644
index 6e6ab0003..000000000
--- a/weed-fs/src/pkg/util/post.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package util
-
-import (
- "io/ioutil"
- "log"
- "net/http"
- "net/url"
-)
-
-func Post(url string, values url.Values) ([]byte, error) {
- r, err := http.PostForm(url, values)
- if err != nil {
- log.Println("post to", url, err)
- return nil, err
- }
- defer r.Body.Close()
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- log.Println("read post result from", url, err)
- return nil, err
- }
- return b, nil
-}