aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/operation/system_message.pb.go189
-rw-r--r--go/operation/system_message_test.go58
-rw-r--r--go/proto/Makefile4
-rw-r--r--go/proto/system_message.proto25
-rw-r--r--go/storage/store.go60
-rw-r--r--go/storage/volume_info.go23
-rw-r--r--go/topology/topology.go21
-rw-r--r--go/util/http_util.go16
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go40
9 files changed, 389 insertions, 47 deletions
diff --git a/go/operation/system_message.pb.go b/go/operation/system_message.pb.go
new file mode 100644
index 000000000..45ae8a648
--- /dev/null
+++ b/go/operation/system_message.pb.go
@@ -0,0 +1,189 @@
+// Code generated by protoc-gen-go.
+// source: system_message.proto
+// DO NOT EDIT!
+
+/*
+Package operation is a generated protocol buffer package.
+
+It is generated from these files:
+ system_message.proto
+
+It has these top-level messages:
+ VolumeInformationMessage
+ JoinMessage
+*/
+package operation
+
+import proto "code.google.com/p/goprotobuf/proto"
+import json "encoding/json"
+import math "math"
+
+// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+var _ = proto.Marshal
+var _ = &json.SyntaxError{}
+var _ = math.Inf
+
+type VolumeInformationMessage struct {
+ Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
+ Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
+ Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
+ FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
+ DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
+ DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
+ ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
+ ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
+ Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
+func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
+func (*VolumeInformationMessage) ProtoMessage() {}
+
+const Default_VolumeInformationMessage_Version uint32 = 2
+
+func (m *VolumeInformationMessage) GetId() uint32 {
+ if m != nil && m.Id != nil {
+ return *m.Id
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetSize() uint64 {
+ if m != nil && m.Size != nil {
+ return *m.Size
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetCollection() string {
+ if m != nil && m.Collection != nil {
+ return *m.Collection
+ }
+ return ""
+}
+
+func (m *VolumeInformationMessage) GetFileCount() uint64 {
+ if m != nil && m.FileCount != nil {
+ return *m.FileCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
+ if m != nil && m.DeleteCount != nil {
+ return *m.DeleteCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
+ if m != nil && m.DeletedByteCount != nil {
+ return *m.DeletedByteCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetReadOnly() bool {
+ if m != nil && m.ReadOnly != nil {
+ return *m.ReadOnly
+ }
+ return false
+}
+
+func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
+ if m != nil && m.ReplicaPlacement != nil {
+ return *m.ReplicaPlacement
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetVersion() uint32 {
+ if m != nil && m.Version != nil {
+ return *m.Version
+ }
+ return Default_VolumeInformationMessage_Version
+}
+
+type JoinMessage struct {
+ IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
+ Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
+ Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
+ PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
+ MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
+ MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
+ DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
+ Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
+ Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *JoinMessage) Reset() { *m = JoinMessage{} }
+func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
+func (*JoinMessage) ProtoMessage() {}
+
+func (m *JoinMessage) GetIsInit() bool {
+ if m != nil && m.IsInit != nil {
+ return *m.IsInit
+ }
+ return false
+}
+
+func (m *JoinMessage) GetIp() string {
+ if m != nil && m.Ip != nil {
+ return *m.Ip
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetPort() uint32 {
+ if m != nil && m.Port != nil {
+ return *m.Port
+ }
+ return 0
+}
+
+func (m *JoinMessage) GetPublicUrl() string {
+ if m != nil && m.PublicUrl != nil {
+ return *m.PublicUrl
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetMaxVolumeCount() uint32 {
+ if m != nil && m.MaxVolumeCount != nil {
+ return *m.MaxVolumeCount
+ }
+ return 0
+}
+
+func (m *JoinMessage) GetMaxFileKey() uint64 {
+ if m != nil && m.MaxFileKey != nil {
+ return *m.MaxFileKey
+ }
+ return 0
+}
+
+func (m *JoinMessage) GetDataCenter() string {
+ if m != nil && m.DataCenter != nil {
+ return *m.DataCenter
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetRack() string {
+ if m != nil && m.Rack != nil {
+ return *m.Rack
+ }
+ return ""
+}
+
+func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
+ if m != nil {
+ return m.Volumes
+ }
+ return nil
+}
+
+func init() {
+}
diff --git a/go/operation/system_message_test.go b/go/operation/system_message_test.go
new file mode 100644
index 000000000..2731d0b2f
--- /dev/null
+++ b/go/operation/system_message_test.go
@@ -0,0 +1,58 @@
+package operation
+
+import (
+ proto "code.google.com/p/goprotobuf/proto"
+ "encoding/json"
+ "log"
+ "testing"
+)
+
+func TestSerialDeserial(t *testing.T) {
+ volumeMessage := &VolumeInformationMessage{
+ Id: proto.Uint32(12),
+ Size: proto.Uint64(2341234),
+ Collection: proto.String("benchmark"),
+ FileCount: proto.Uint64(2341234),
+ DeleteCount: proto.Uint64(234),
+ DeletedByteCount: proto.Uint64(21234),
+ ReadOnly: proto.Bool(false),
+ ReplicaPlacement: proto.Uint32(210),
+ Version: proto.Uint32(2),
+ }
+ var volumeMessages []*VolumeInformationMessage
+ volumeMessages = append(volumeMessages, volumeMessage)
+
+ joinMessage := &JoinMessage{
+ IsInit: proto.Bool(true),
+ Ip: proto.String("127.0.3.12"),
+ Port: proto.Uint32(34546),
+ PublicUrl: proto.String("localhost:2342"),
+ MaxVolumeCount: proto.Uint32(210),
+ MaxFileKey: proto.Uint64(324234423),
+ DataCenter: proto.String("dc1"),
+ Rack: proto.String("rack2"),
+ Volumes: volumeMessages,
+ }
+
+ data, err := proto.Marshal(joinMessage)
+ if err != nil {
+ log.Fatal("marshaling error: ", err)
+ }
+ newMessage := &JoinMessage{}
+ err = proto.Unmarshal(data, newMessage)
+ if err != nil {
+ log.Fatal("unmarshaling error: ", err)
+ }
+ log.Println("The pb data size is", len(data))
+
+ jsonData, jsonError := json.Marshal(joinMessage)
+ if jsonError != nil {
+ log.Fatal("json marshaling error: ", jsonError)
+ }
+ log.Println("The json data size is", len(jsonData), string(jsonData))
+
+ // Now test and newTest contain the same data.
+ if *joinMessage.PublicUrl != *newMessage.PublicUrl {
+ log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
+ }
+}
diff --git a/go/proto/Makefile b/go/proto/Makefile
new file mode 100644
index 000000000..73af851dd
--- /dev/null
+++ b/go/proto/Makefile
@@ -0,0 +1,4 @@
+TARG=../operation
+
+all:
+ protoc --go_out=$(TARG) system_message.proto
diff --git a/go/proto/system_message.proto b/go/proto/system_message.proto
new file mode 100644
index 000000000..15574ad56
--- /dev/null
+++ b/go/proto/system_message.proto
@@ -0,0 +1,25 @@
+package operation;
+
+message VolumeInformationMessage {
+ required uint32 id = 1;
+ required uint64 size = 2;
+ optional string collection = 3;
+ required uint64 file_count = 4;
+ required uint64 delete_count = 5;
+ required uint64 deleted_byte_count = 6;
+ optional bool read_only = 7;
+ required uint32 replica_placement = 8;
+ optional uint32 version = 9 [default=2];
+}
+
+message JoinMessage {
+ optional bool is_init = 1;
+ required string ip = 2;
+ required uint32 port = 3;
+ optional string public_url = 4;
+ required uint32 max_volume_count = 5;
+ required uint64 max_file_key = 6;
+ optional string data_center = 7;
+ optional string rack = 8;
+ repeated VolumeInformationMessage volumes = 9;
+}
diff --git a/go/storage/store.go b/go/storage/store.go
index 8fe8f2c4c..54764cc56 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -1,6 +1,7 @@
package storage
import (
+ proto "code.google.com/p/goprotobuf/proto"
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util"
@@ -9,7 +10,6 @@ import (
"fmt"
"io/ioutil"
"math/rand"
- "net/url"
"strconv"
"strings"
)
@@ -269,40 +269,50 @@ func (s *Store) Join() (masterNode string, e error) {
if e != nil {
return
}
- stats := new([]*VolumeInfo)
+ var volumeMessages []*operation.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey uint64
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
for k, v := range location.volumes {
- s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
- Collection: v.Collection,
- ReplicaPlacement: v.ReplicaPlacement,
- Version: v.Version(),
- FileCount: v.nm.FileCount(),
- DeleteCount: v.nm.DeletedCount(),
- DeletedByteCount: v.nm.DeletedSize(),
- ReadOnly: v.readOnly}
- *stats = append(*stats, s)
+ volumeMessage := &operation.VolumeInformationMessage{
+ Id: proto.Uint32(uint32(k)),
+ Size: proto.Uint64(uint64(v.Size())),
+ Collection: proto.String(v.Collection),
+ FileCount: proto.Uint64(uint64(v.nm.FileCount())),
+ DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
+ DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
+ ReadOnly: proto.Bool(v.readOnly),
+ ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
+ Version: proto.Uint32(uint32(v.Version())),
+ }
+ volumeMessages = append(volumeMessages, volumeMessage)
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
}
}
- bytes, _ := json.Marshal(stats)
- values := make(url.Values)
- if !s.connected {
- values.Add("init", "true")
- }
- values.Add("port", strconv.Itoa(s.Port))
- values.Add("ip", s.Ip)
- values.Add("publicUrl", s.PublicUrl)
- values.Add("volumes", string(bytes))
- values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
- values.Add("maxFileKey", strconv.FormatUint(maxFileKey, 10))
- values.Add("dataCenter", s.dataCenter)
- values.Add("rack", s.rack)
- jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values)
+
+ joinMessage := &operation.JoinMessage{
+ IsInit: proto.Bool(!s.connected),
+ Ip: proto.String(s.Ip),
+ Port: proto.Uint32(uint32(s.Port)),
+ PublicUrl: proto.String(s.PublicUrl),
+ MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
+ MaxFileKey: proto.Uint64(maxFileKey),
+ DataCenter: proto.String(s.dataCenter),
+ Rack: proto.String(s.rack),
+ Volumes: volumeMessages,
+ }
+
+ data, err := proto.Marshal(joinMessage)
+ if err != nil {
+ return "", err
+ }
+
+ println("join data size", len(data))
+
+ jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
if err != nil {
s.masterNodes.reset()
return "", err
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index 1dfb3dcae..165af1a19 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -1,6 +1,8 @@
package storage
-import ()
+import (
+ "code.google.com/p/weed-fs/go/operation"
+)
type VolumeInfo struct {
Id VolumeId
@@ -13,3 +15,22 @@ type VolumeInfo struct {
DeletedByteCount uint64
ReadOnly bool
}
+
+func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
+ vi = VolumeInfo{
+ Id: VolumeId(*m.Id),
+ Size: *m.Size,
+ Collection: *m.Collection,
+ FileCount: int(*m.FileCount),
+ DeleteCount: int(*m.DeleteCount),
+ DeletedByteCount: *m.DeletedByteCount,
+ ReadOnly: *m.ReadOnly,
+ Version: Version(*m.Version),
+ }
+ rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
+ if e != nil {
+ return vi, e
+ }
+ vi.ReplicaPlacement = rp
+ return vi, nil
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 9db3e78ae..f1daffb53 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -2,6 +2,7 @@ package topology
import (
"code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
"errors"
@@ -143,16 +144,24 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, maxFileKey uint64, dcName string, rackName string) {
- t.Sequence.SetMax(maxFileKey)
- dcName, rackName = t.configuration.Locate(ip, dcName, rackName)
+func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
+ t.Sequence.SetMax(*joinMessage.MaxFileKey)
+ dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn := rack.FindDataNode(ip, port)
- if init && dn != nil {
+ dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
+ if *joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn)
}
- dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
+ dn = rack.GetOrCreateDataNode(*joinMessage.Ip, int(*joinMessage.Port), *joinMessage.PublicUrl, int(*joinMessage.MaxVolumeCount))
+ var volumeInfos []storage.VolumeInfo
+ for _, v := range joinMessage.Volumes {
+ if vi, err := storage.NewVolumeInfo(v); err == nil {
+ volumeInfos = append(volumeInfos, vi)
+ } else {
+ glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
+ }
+ }
dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
diff --git a/go/util/http_util.go b/go/util/http_util.go
index a33db9199..6562e964c 100644
--- a/go/util/http_util.go
+++ b/go/util/http_util.go
@@ -1,6 +1,7 @@
package util
import (
+ "bytes"
"code.google.com/p/weed-fs/go/glog"
"fmt"
"io/ioutil"
@@ -21,6 +22,21 @@ func init() {
client = &http.Client{Transport: Transport}
}
+func PostBytes(url string, body []byte) ([]byte, error) {
+ r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return nil, err
+ }
+ defer r.Body.Close()
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ glog.V(0).Infoln("read post result from", url, err)
+ return nil, err
+ }
+ return b, nil
+}
+
func Post(url string, values url.Values) ([]byte, error) {
r, err := client.PostForm(url, values)
if err != nil {
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index e549a1dfb..bd6747e99 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -1,12 +1,15 @@
package weed_server
import (
+ proto "code.google.com/p/goprotobuf/proto"
+ "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
+ "io/ioutil"
"net/http"
"strconv"
"strings"
@@ -29,23 +32,30 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
}
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
- init := r.FormValue("init") == "true"
- ip := r.FormValue("ip")
- if ip == "" {
- ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
- }
- port, _ := strconv.Atoi(r.FormValue("port"))
- maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
- maxFileKey, _ := strconv.ParseUint(r.FormValue("maxFileKey"), 10, 64)
- s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
- publicUrl := r.FormValue("publicUrl")
- volumes := new([]storage.VolumeInfo)
- if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil {
- writeJsonQuiet(w, r, operation.JoinResult{Error: "Cannot unmarshal \"volumes\": " + err.Error()})
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ writeJsonError(w, r, err)
+ return
+ }
+ joinMessage := &operation.JoinMessage{}
+ if err = proto.Unmarshal(body, joinMessage); err != nil {
+ writeJsonError(w, r, err)
return
}
- debug(s, "volumes", r.FormValue("volumes"))
- ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, maxFileKey, r.FormValue("dataCenter"), r.FormValue("rack"))
+ if *joinMessage.Ip == "" {
+ *joinMessage.Ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
+ }
+ if glog.V(4) {
+ if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
+ glog.V(0).Infoln("json marshaling error: ", jsonError)
+ writeJsonError(w, r, jsonError)
+ return
+ } else {
+ glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
+ }
+ }
+
+ ms.Topo.RegisterVolumes(joinMessage)
writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}