aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'weed-fs/src/pkg')
-rw-r--r--weed-fs/src/pkg/storage/needle.go3
-rw-r--r--weed-fs/src/pkg/storage/needle_map.go2
-rw-r--r--weed-fs/src/pkg/storage/replication_type.go123
-rw-r--r--weed-fs/src/pkg/storage/store.go17
-rw-r--r--weed-fs/src/pkg/storage/volume.go91
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go118
-rw-r--r--weed-fs/src/pkg/topology/topology.go46
-rw-r--r--weed-fs/src/pkg/topology/topology_compact.go87
-rw-r--r--weed-fs/src/pkg/topology/topology_map.go51
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go1
10 files changed, 363 insertions, 176 deletions
diff --git a/weed-fs/src/pkg/storage/needle.go b/weed-fs/src/pkg/storage/needle.go
index e352eb55d..99765589d 100644
--- a/weed-fs/src/pkg/storage/needle.go
+++ b/weed-fs/src/pkg/storage/needle.go
@@ -116,8 +116,7 @@ func ReadNeedle(r *os.File) (*Needle, uint32) {
n.Id = util.BytesToUint64(bytes[4:12])
n.Size = util.BytesToUint32(bytes[12:16])
rest := 8 - ((n.Size + 16 + 4) % 8)
- r.Seek(int64(n.Size+4+rest), 1)
- return n, 16 + n.Size + 4 + rest
+ return n, n.Size + 4 + rest
}
func ParseKeyHash(key_hash_string string) (uint64, uint32) {
key_hash_bytes, khe := hex.DecodeString(key_hash_string)
diff --git a/weed-fs/src/pkg/storage/needle_map.go b/weed-fs/src/pkg/storage/needle_map.go
index 44b74e8c1..09a25a83e 100644
--- a/weed-fs/src/pkg/storage/needle_map.go
+++ b/weed-fs/src/pkg/storage/needle_map.go
@@ -44,9 +44,11 @@ func LoadNeedleMap(file *os.File) *NeedleMap {
size := util.BytesToUint32(bytes[i+12 : i+16])
if offset > 0 {
nm.m.Set(Key(key), offset, size)
+ log.Println("reading key", key, "offset", offset, "size", size)
nm.fileCounter++
} else {
nm.m.Delete(Key(key))
+ log.Println("removing key", key)
nm.deletionCounter++
}
}
diff --git a/weed-fs/src/pkg/storage/replication_type.go b/weed-fs/src/pkg/storage/replication_type.go
new file mode 100644
index 000000000..86a9d219d
--- /dev/null
+++ b/weed-fs/src/pkg/storage/replication_type.go
@@ -0,0 +1,123 @@
+package storage
+
+import (
+ "errors"
+)
+
+type ReplicationType string
+
+const (
+ Copy000 = ReplicationType("000") // single copy
+ Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
+ Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
+ Copy100 = ReplicationType("100") // 2 copies, each on different data center
+ Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
+ Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
+ LengthRelicationType = 6
+ CopyNil = ReplicationType(255) // nil value
+)
+
+func NewReplicationTypeFromString(t string) (ReplicationType, error) {
+ switch t {
+ case "000":
+ return Copy000, nil
+ case "001":
+ return Copy001, nil
+ case "010":
+ return Copy010, nil
+ case "100":
+ return Copy100, nil
+ case "110":
+ return Copy110, nil
+ case "200":
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:"+t)
+}
+func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
+ switch b {
+ case byte(000):
+ return Copy000, nil
+ case byte(001):
+ return Copy001, nil
+ case byte(010):
+ return Copy010, nil
+ case byte(100):
+ return Copy100, nil
+ case byte(110):
+ return Copy110, nil
+ case byte(200):
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:"+string(b))
+}
+
+func (r *ReplicationType) String() string {
+ switch *r {
+ case Copy000:
+ return "000"
+ case Copy001:
+ return "001"
+ case Copy010:
+ return "010"
+ case Copy100:
+ return "100"
+ case Copy110:
+ return "110"
+ case Copy200:
+ return "200"
+ }
+ return "000"
+}
+func (r *ReplicationType) Byte() byte {
+ switch *r {
+ case Copy000:
+ return byte(000)
+ case Copy001:
+ return byte(001)
+ case Copy010:
+ return byte(010)
+ case Copy100:
+ return byte(100)
+ case Copy110:
+ return byte(110)
+ case Copy200:
+ return byte(200)
+ }
+ return byte(000)
+}
+
+func (repType ReplicationType)GetReplicationLevelIndex() int {
+ switch repType {
+ case Copy000:
+ return 0
+ case Copy001:
+ return 1
+ case Copy010:
+ return 2
+ case Copy100:
+ return 3
+ case Copy110:
+ return 4
+ case Copy200:
+ return 5
+ }
+ return -1
+}
+func (repType ReplicationType)GetCopyCount() int {
+ switch repType {
+ case Copy000:
+ return 1
+ case Copy001:
+ return 2
+ case Copy010:
+ return 2
+ case Copy100:
+ return 2
+ case Copy110:
+ return 3
+ case Copy200:
+ return 3
+ }
+ return 0
+}
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index 225fc9d92..6beb224f5 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -36,7 +36,7 @@ func (s *Store) AddVolume(volumeListString string, replicationType string) error
for _, range_string := range strings.Split(volumeListString, ",") {
if strings.Index(range_string, "-") < 0 {
id_string := range_string
- id, err := strconv.ParseUint(id_string, 10, 64)
+ id, err := NewVolumeId(id_string)
if err != nil {
return errors.New("Volume Id " + id_string + " is not a valid unsigned integer!")
}
@@ -68,6 +68,21 @@ func (s *Store) addVolume(vid VolumeId, replicationType ReplicationType) error {
s.volumes[vid] = NewVolume(s.dir, vid, replicationType)
return nil
}
+
+func (s *Store) CompactVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ return s.volumes[vid].compact()
+}
+func (s *Store) CommitCompactVolume(volumeIdString string) (int,error) {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return 0, errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ return s.volumes[vid].commitCompact()
+}
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index a9713c36b..ee1d98b6a 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -1,11 +1,11 @@
package storage
import (
+ "errors"
"log"
"os"
"path"
"sync"
- "errors"
)
const (
@@ -21,29 +21,30 @@ type Volume struct {
replicaType ReplicationType
accessLock sync.Mutex
-
}
func NewVolume(dirname string, id VolumeId, replicationType ReplicationType) (v *Volume) {
- var e error
v = &Volume{dir: dirname, Id: id, replicaType: replicationType}
- fileName := id.String()
- v.dataFile, e = os.OpenFile(path.Join(v.dir, fileName+".dat"), os.O_RDWR|os.O_CREATE, 0644)
+ v.load()
+ return
+}
+func (v *Volume) load() {
+ var e error
+ fileName := path.Join(v.dir, v.Id.String())
+ v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
log.Fatalf("New Volume [ERROR] %s\n", e)
}
- if replicationType == CopyNil {
+ if v.replicaType == CopyNil {
v.readSuperBlock()
} else {
v.maybeWriteSuperBlock()
}
- indexFile, ie := os.OpenFile(path.Join(v.dir, fileName+".idx"), os.O_RDWR|os.O_CREATE, 0644)
+ indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
if ie != nil {
log.Fatalf("Write Volume Index [ERROR] %s\n", ie)
}
v.nm = LoadNeedleMap(indexFile)
-
- return
}
func (v *Volume) Size() int64 {
stat, e := v.dataFile.Stat()
@@ -107,3 +108,75 @@ func (v *Volume) read(n *Needle) (int, error) {
}
return -1, errors.New("Not Found")
}
+
+func (v *Volume) compact() error {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+
+ filePath := path.Join(v.dir, v.Id.String())
+ return v.copyDataAndGenerateIndexFile(filePath+".dat", filePath+".cpd", filePath+".cpx")
+}
+func (v *Volume) commitCompact() (int, error) {
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ v.dataFile.Close()
+ os.Rename(path.Join(v.dir, v.Id.String()+".cpd"), path.Join(v.dir, v.Id.String()+".dat"))
+ os.Rename(path.Join(v.dir, v.Id.String()+".cpx"), path.Join(v.dir, v.Id.String()+".idx"))
+ v.load()
+ return 0, nil
+}
+
+func (v *Volume) copyDataAndGenerateIndexFile(srcName, dstName, idxName string) (err error) {
+ src, err := os.OpenFile(srcName, os.O_RDONLY, 0644)
+ if err != nil {
+ return err
+ }
+ defer src.Close()
+
+ dst, err := os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE, 0644)
+ if err != nil {
+ return err
+ }
+ defer dst.Close()
+
+ idx, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE, 0644)
+ if err != nil {
+ return err
+ }
+ defer idx.Close()
+
+ src.Seek(0, 0)
+ header := make([]byte, SuperBlockSize)
+ if _, error := src.Read(header); error == nil {
+ dst.Write(header)
+ }
+
+ n, rest := ReadNeedle(src)
+ nm := NewNeedleMap(idx)
+ old_offset := uint32(SuperBlockSize)
+ new_offset := uint32(SuperBlockSize)
+ for n != nil {
+ nv, ok := v.nm.Get(n.Id)
+ //log.Println("file size is", n.Size, "rest", rest)
+ if !ok || nv.Offset*8 != old_offset {
+ log.Println("expected offset should be", nv.Offset*8, "skipping", (rest - 16), "key", n.Id, "volume offset", old_offset, "data_size", n.Size, "rest", rest)
+ src.Seek(int64(rest), 1)
+ } else {
+ if nv.Size > 0 {
+ nm.Put(n.Id, new_offset/8, n.Size)
+ bytes := make([]byte, n.Size+4)
+ src.Read(bytes)
+ n.Data = bytes[:n.Size]
+ n.Checksum = NewCRC(n.Data)
+ n.Append(dst)
+ new_offset += rest+16
+ log.Println("saving key", n.Id, "volume offset", old_offset, "=>", new_offset, "data_size", n.Size, "rest", rest)
+ }
+ src.Seek(int64(rest-n.Size-4), 1)
+ }
+ old_offset += rest+16
+ n, rest = ReadNeedle(src)
+ }
+
+ return nil
+}
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index b8eb62f0a..05b9e6205 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -1,7 +1,6 @@
package storage
import (
- "errors"
)
type VolumeInfo struct {
@@ -11,120 +10,3 @@ type VolumeInfo struct {
FileCount int
DeleteCount int
}
-type ReplicationType string
-
-const (
- Copy000 = ReplicationType("000") // single copy
- Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
- Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
- Copy100 = ReplicationType("100") // 2 copies, each on different data center
- Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
- Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
- LengthRelicationType = 6
- CopyNil = ReplicationType(255) // nil value
-)
-
-func NewReplicationTypeFromString(t string) (ReplicationType, error) {
- switch t {
- case "000":
- return Copy000, nil
- case "001":
- return Copy001, nil
- case "010":
- return Copy010, nil
- case "100":
- return Copy100, nil
- case "110":
- return Copy110, nil
- case "200":
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:"+t)
-}
-func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
- switch b {
- case byte(000):
- return Copy000, nil
- case byte(001):
- return Copy001, nil
- case byte(010):
- return Copy010, nil
- case byte(100):
- return Copy100, nil
- case byte(110):
- return Copy110, nil
- case byte(200):
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:"+string(b))
-}
-
-func (r *ReplicationType) String() string {
- switch *r {
- case Copy000:
- return "000"
- case Copy001:
- return "001"
- case Copy010:
- return "010"
- case Copy100:
- return "100"
- case Copy110:
- return "110"
- case Copy200:
- return "200"
- }
- return "000"
-}
-func (r *ReplicationType) Byte() byte {
- switch *r {
- case Copy000:
- return byte(000)
- case Copy001:
- return byte(001)
- case Copy010:
- return byte(010)
- case Copy100:
- return byte(100)
- case Copy110:
- return byte(110)
- case Copy200:
- return byte(200)
- }
- return byte(000)
-}
-
-func (repType ReplicationType)GetReplicationLevelIndex() int {
- switch repType {
- case Copy000:
- return 0
- case Copy001:
- return 1
- case Copy010:
- return 2
- case Copy100:
- return 3
- case Copy110:
- return 4
- case Copy200:
- return 5
- }
- return -1
-}
-func (repType ReplicationType)GetCopyCount() int {
- switch repType {
- case Copy000:
- return 1
- case Copy001:
- return 2
- case Copy010:
- return 2
- case Copy100:
- return 2
- case Copy110:
- return 3
- case Copy200:
- return 3
- }
- return 0
-}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index 836a5cc69..52d13135f 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -143,49 +143,3 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
return dc
}
-func (t *Topology) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- var dcs []interface{}
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- dcs = append(dcs, dc.ToMap())
- }
- m["DataCenters"] = dcs
- var layouts []interface{}
- for _, layout := range t.replicaType2VolumeLayout {
- if layout != nil {
- layouts = append(layouts, layout.ToMap())
- }
- }
- m["layouts"] = layouts
- return m
-}
-
-func (t *Topology) ToVolumeMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- dcs := make(map[NodeId]interface{})
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- racks := make(map[NodeId]interface{})
- for _, r := range dc.Children() {
- rack := r.(*Rack)
- dataNodes := make(map[NodeId]interface{})
- for _, d := range rack.Children() {
- dn := d.(*DataNode)
- var volumes []interface{}
- for _, v := range dn.volumes {
- volumes = append(volumes, v)
- }
- dataNodes[d.Id()] = volumes
- }
- racks[r.Id()] = dataNodes
- }
- dcs[dc.Id()] = racks
- }
- m["DataCenters"] = dcs
- return m
-}
diff --git a/weed-fs/src/pkg/topology/topology_compact.go b/weed-fs/src/pkg/topology/topology_compact.go
new file mode 100644
index 000000000..e6232cec6
--- /dev/null
+++ b/weed-fs/src/pkg/topology/topology_compact.go
@@ -0,0 +1,87 @@
+package topology
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "pkg/storage"
+ "pkg/util"
+ "time"
+)
+
+func (t *Topology) Vacuum() int {
+ total_counter := 0
+ for _, vl := range t.replicaType2VolumeLayout {
+ if vl != nil {
+ for vid, locationlist := range vl.vid2location {
+ each_volume_counter := 0
+ vl.removeFromWritable(vid)
+ ch := make(chan int, locationlist.Length())
+ for _, dn := range locationlist.list {
+ go func(url string, vid storage.VolumeId) {
+ vacuumVolume_Compact(url, vid)
+ }(dn.Url(), vid)
+ }
+ for _ = range locationlist.list {
+ select {
+ case count := <-ch:
+ each_volume_counter += count
+ case <-time.After(30 * time.Minute):
+ each_volume_counter = 0
+ break
+ }
+ }
+ if each_volume_counter > 0 {
+ for _, dn := range locationlist.list {
+ if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
+ fmt.Println("Error when committing on", dn.Url(), e)
+ panic(e)
+ }
+ }
+ vl.setVolumeWritable(vid)
+ total_counter += each_volume_counter
+ }
+ }
+ }
+ }
+ return 0
+}
+
+type VacuumVolumeResult struct {
+ Bytes int
+ Error string
+}
+
+func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) (error, int) {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
+ if err != nil {
+ return err, 0
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err, 0
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error), 0
+ }
+ return nil, ret.Bytes
+}
+func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/weed-fs/src/pkg/topology/topology_map.go b/weed-fs/src/pkg/topology/topology_map.go
new file mode 100644
index 000000000..9ccf08ae3
--- /dev/null
+++ b/weed-fs/src/pkg/topology/topology_map.go
@@ -0,0 +1,51 @@
+package topology
+
+import (
+)
+
+func (t *Topology) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = t.GetMaxVolumeCount()
+ m["Free"] = t.FreeSpace()
+ var dcs []interface{}
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ dcs = append(dcs, dc.ToMap())
+ }
+ m["DataCenters"] = dcs
+ var layouts []interface{}
+ for _, layout := range t.replicaType2VolumeLayout {
+ if layout != nil {
+ layouts = append(layouts, layout.ToMap())
+ }
+ }
+ m["layouts"] = layouts
+ return m
+}
+
+func (t *Topology) ToVolumeMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = t.GetMaxVolumeCount()
+ m["Free"] = t.FreeSpace()
+ dcs := make(map[NodeId]interface{})
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ racks := make(map[NodeId]interface{})
+ for _, r := range dc.Children() {
+ rack := r.(*Rack)
+ dataNodes := make(map[NodeId]interface{})
+ for _, d := range rack.Children() {
+ dn := d.(*DataNode)
+ var volumes []interface{}
+ for _, v := range dn.volumes {
+ volumes = append(volumes, v)
+ }
+ dataNodes[d.Id()] = volumes
+ }
+ racks[r.Id()] = dataNodes
+ }
+ dcs[dc.Id()] = racks
+ }
+ m["DataCenters"] = dcs
+ return m
+}
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
index 16afb2dfb..64d8cdf43 100644
--- a/weed-fs/src/pkg/topology/volume_location.go
+++ b/weed-fs/src/pkg/topology/volume_location.go
@@ -27,6 +27,7 @@ func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
dnll.list = append(dnll.list, loc)
return true
}
+
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
for i, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {