aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2017-01-10 01:01:12 -0800
committerChris Lu <chris.lu@gmail.com>2017-01-10 01:01:12 -0800
commite46c3415f752e2e0c252c420adb882c4bcb7416b (patch)
tree65bb66899c2e97ebc340f09e7951e2f663ec4d78
parent4beaaa06505220c80d502d7b3ebd8b8b71071f5f (diff)
downloadseaweedfs-e46c3415f752e2e0c252c420adb882c4bcb7416b.tar.xz
seaweedfs-e46c3415f752e2e0c252c420adb882c4bcb7416b.zip
gRpc for master~volume heartbeat
-rw-r--r--weed/command/master.go27
-rw-r--r--weed/command/server.go28
-rw-r--r--weed/command/volume.go2
-rw-r--r--weed/operation/system_message.pb.go203
-rw-r--r--weed/operation/system_message_test.go59
-rw-r--r--weed/pb/Makefile6
-rw-r--r--weed/pb/seaweed.pb.go384
-rw-r--r--weed/pb/seaweed.proto41
-rw-r--r--weed/server/master_grpc_server.go57
-rw-r--r--weed/server/master_server.go1
-rw-r--r--weed/server/master_server_handlers_admin.go36
-rw-r--r--weed/server/volume_grpc_client.go74
-rw-r--r--weed/server/volume_server.go34
-rw-r--r--weed/storage/store.go102
-rw-r--r--weed/storage/volume_info.go25
-rw-r--r--weed/topology/data_node.go3
-rw-r--r--weed/topology/node.go6
-rw-r--r--weed/topology/rack.go5
-rw-r--r--weed/topology/topology.go40
-rw-r--r--weed/topology/topology_event_handling.go14
20 files changed, 664 insertions, 483 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index ec54fbd7b..eee22810b 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -10,9 +10,13 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
+ "github.com/soheilhy/cmux"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
)
func init() {
@@ -39,7 +43,7 @@ var (
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "Deprecating! xml configuration file")
defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
- mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds")
+ mTimeout = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds")
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
@@ -99,8 +103,25 @@ func runMaster(cmd *Command, args []string) bool {
ms.SetRaftServer(raftServer)
}()
- if e := http.Serve(listener, r); e != nil {
- glog.Fatalf("Fail to serve: %v", e)
+ // start grpc and http server
+ m := cmux.New(listener)
+
+ grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
+ httpL := m.Match(cmux.Any())
+
+ // Create your protocol servers.
+ grpcS := grpc.NewServer()
+ pb.RegisterSeaweedServer(grpcS, ms)
+ reflection.Register(grpcS)
+
+ httpS := &http.Server{Handler: r}
+
+ go grpcS.Serve(grpcL)
+ go httpS.Serve(httpL)
+
+ if err := m.Serve(); err != nil {
+ glog.Fatalf("master server failed to serve: %v", err)
}
+
return true
}
diff --git a/weed/command/server.go b/weed/command/server.go
index 87146940f..5bde22517 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -11,10 +11,14 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
+ "github.com/soheilhy/cmux"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
)
type ServerOptions struct {
@@ -51,7 +55,7 @@ var (
serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds")
+ serverTimeout = cmdServer.Flag.Int("idleTimeout", 30, "connection idle seconds")
serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
@@ -230,9 +234,27 @@ func runServer(cmd *Command, args []string) bool {
}()
raftWaitForMaster.Done()
- if e := http.Serve(masterListener, r); e != nil {
- glog.Fatalf("Master Fail to serve:%s", e.Error())
+
+ // start grpc and http server
+ m := cmux.New(masterListener)
+
+ grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
+ httpL := m.Match(cmux.Any())
+
+ // Create your protocol servers.
+ grpcS := grpc.NewServer()
+ pb.RegisterSeaweedServer(grpcS, ms)
+ reflection.Register(grpcS)
+
+ httpS := &http.Server{Handler: r}
+
+ go grpcS.Serve(grpcL)
+ go httpS.Serve(httpL)
+
+ if err := m.Serve(); err != nil {
+ glog.Fatalf("master server failed to serve: %v", err)
}
+
}()
volumeWait.Wait()
diff --git a/weed/command/volume.go b/weed/command/volume.go
index ba498b8e4..0e69325b6 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -48,7 +48,7 @@ func init() {
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
- v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
+ v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
diff --git a/weed/operation/system_message.pb.go b/weed/operation/system_message.pb.go
deleted file mode 100644
index 742a1ca4e..000000000
--- a/weed/operation/system_message.pb.go
+++ /dev/null
@@ -1,203 +0,0 @@
-// Code generated by protoc-gen-go.
-// source: system_message.proto
-// DO NOT EDIT!
-
-/*
-Package operation is a generated protocol buffer package.
-
-It is generated from these files:
- system_message.proto
-
-It has these top-level messages:
- VolumeInformationMessage
- JoinMessage
-*/
-package operation
-
-import proto "github.com/golang/protobuf/proto"
-import math "math"
-
-// Reference imports to suppress errors if they are not otherwise used.
-var _ = proto.Marshal
-var _ = math.Inf
-
-type VolumeInformationMessage struct {
- Id *uint32 `protobuf:"varint,1,req,name=id" json:"id,omitempty"`
- Size *uint64 `protobuf:"varint,2,req,name=size" json:"size,omitempty"`
- Collection *string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
- FileCount *uint64 `protobuf:"varint,4,req,name=file_count" json:"file_count,omitempty"`
- DeleteCount *uint64 `protobuf:"varint,5,req,name=delete_count" json:"delete_count,omitempty"`
- DeletedByteCount *uint64 `protobuf:"varint,6,req,name=deleted_byte_count" json:"deleted_byte_count,omitempty"`
- ReadOnly *bool `protobuf:"varint,7,opt,name=read_only" json:"read_only,omitempty"`
- ReplicaPlacement *uint32 `protobuf:"varint,8,req,name=replica_placement" json:"replica_placement,omitempty"`
- Version *uint32 `protobuf:"varint,9,opt,name=version,def=2" json:"version,omitempty"`
- Ttl *uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
- XXX_unrecognized []byte `json:"-"`
-}
-
-func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
-func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
-func (*VolumeInformationMessage) ProtoMessage() {}
-
-const Default_VolumeInformationMessage_Version uint32 = 2
-
-func (m *VolumeInformationMessage) GetId() uint32 {
- if m != nil && m.Id != nil {
- return *m.Id
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetSize() uint64 {
- if m != nil && m.Size != nil {
- return *m.Size
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetCollection() string {
- if m != nil && m.Collection != nil {
- return *m.Collection
- }
- return ""
-}
-
-func (m *VolumeInformationMessage) GetFileCount() uint64 {
- if m != nil && m.FileCount != nil {
- return *m.FileCount
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
- if m != nil && m.DeleteCount != nil {
- return *m.DeleteCount
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
- if m != nil && m.DeletedByteCount != nil {
- return *m.DeletedByteCount
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetReadOnly() bool {
- if m != nil && m.ReadOnly != nil {
- return *m.ReadOnly
- }
- return false
-}
-
-func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
- if m != nil && m.ReplicaPlacement != nil {
- return *m.ReplicaPlacement
- }
- return 0
-}
-
-func (m *VolumeInformationMessage) GetVersion() uint32 {
- if m != nil && m.Version != nil {
- return *m.Version
- }
- return Default_VolumeInformationMessage_Version
-}
-
-func (m *VolumeInformationMessage) GetTtl() uint32 {
- if m != nil && m.Ttl != nil {
- return *m.Ttl
- }
- return 0
-}
-
-type JoinMessage struct {
- IsInit *bool `protobuf:"varint,1,opt,name=is_init" json:"is_init,omitempty"`
- Ip *string `protobuf:"bytes,2,req,name=ip" json:"ip,omitempty"`
- Port *uint32 `protobuf:"varint,3,req,name=port" json:"port,omitempty"`
- PublicUrl *string `protobuf:"bytes,4,opt,name=public_url" json:"public_url,omitempty"`
- MaxVolumeCount *uint32 `protobuf:"varint,5,req,name=max_volume_count" json:"max_volume_count,omitempty"`
- MaxFileKey *uint64 `protobuf:"varint,6,req,name=max_file_key" json:"max_file_key,omitempty"`
- DataCenter *string `protobuf:"bytes,7,opt,name=data_center" json:"data_center,omitempty"`
- Rack *string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
- Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
- AdminPort *uint32 `protobuf:"varint,10,opt,name=admin_port" json:"admin_port,omitempty"`
- XXX_unrecognized []byte `json:"-"`
-}
-
-func (m *JoinMessage) Reset() { *m = JoinMessage{} }
-func (m *JoinMessage) String() string { return proto.CompactTextString(m) }
-func (*JoinMessage) ProtoMessage() {}
-
-func (m *JoinMessage) GetIsInit() bool {
- if m != nil && m.IsInit != nil {
- return *m.IsInit
- }
- return false
-}
-
-func (m *JoinMessage) GetIp() string {
- if m != nil && m.Ip != nil {
- return *m.Ip
- }
- return ""
-}
-
-func (m *JoinMessage) GetPort() uint32 {
- if m != nil && m.Port != nil {
- return *m.Port
- }
- return 0
-}
-
-func (m *JoinMessage) GetPublicUrl() string {
- if m != nil && m.PublicUrl != nil {
- return *m.PublicUrl
- }
- return ""
-}
-
-func (m *JoinMessage) GetMaxVolumeCount() uint32 {
- if m != nil && m.MaxVolumeCount != nil {
- return *m.MaxVolumeCount
- }
- return 0
-}
-
-func (m *JoinMessage) GetMaxFileKey() uint64 {
- if m != nil && m.MaxFileKey != nil {
- return *m.MaxFileKey
- }
- return 0
-}
-
-func (m *JoinMessage) GetDataCenter() string {
- if m != nil && m.DataCenter != nil {
- return *m.DataCenter
- }
- return ""
-}
-
-func (m *JoinMessage) GetRack() string {
- if m != nil && m.Rack != nil {
- return *m.Rack
- }
- return ""
-}
-
-func (m *JoinMessage) GetVolumes() []*VolumeInformationMessage {
- if m != nil {
- return m.Volumes
- }
- return nil
-}
-
-func (m *JoinMessage) GetAdminPort() uint32 {
- if m != nil && m.AdminPort != nil {
- return *m.AdminPort
- }
- return 0
-}
-
-func init() {
-}
diff --git a/weed/operation/system_message_test.go b/weed/operation/system_message_test.go
deleted file mode 100644
index d18ca49a4..000000000
--- a/weed/operation/system_message_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package operation
-
-import (
- "encoding/json"
- "log"
- "testing"
-
- "github.com/golang/protobuf/proto"
-)
-
-func TestSerialDeserial(t *testing.T) {
- volumeMessage := &VolumeInformationMessage{
- Id: proto.Uint32(12),
- Size: proto.Uint64(2341234),
- Collection: proto.String("benchmark"),
- FileCount: proto.Uint64(2341234),
- DeleteCount: proto.Uint64(234),
- DeletedByteCount: proto.Uint64(21234),
- ReadOnly: proto.Bool(false),
- ReplicaPlacement: proto.Uint32(210),
- Version: proto.Uint32(2),
- }
- var volumeMessages []*VolumeInformationMessage
- volumeMessages = append(volumeMessages, volumeMessage)
-
- joinMessage := &JoinMessage{
- IsInit: proto.Bool(true),
- Ip: proto.String("127.0.3.12"),
- Port: proto.Uint32(34546),
- PublicUrl: proto.String("localhost:2342"),
- MaxVolumeCount: proto.Uint32(210),
- MaxFileKey: proto.Uint64(324234423),
- DataCenter: proto.String("dc1"),
- Rack: proto.String("rack2"),
- Volumes: volumeMessages,
- }
-
- data, err := proto.Marshal(joinMessage)
- if err != nil {
- log.Fatal("marshaling error: ", err)
- }
- newMessage := &JoinMessage{}
- err = proto.Unmarshal(data, newMessage)
- if err != nil {
- log.Fatal("unmarshaling error: ", err)
- }
- log.Println("The pb data size is", len(data))
-
- jsonData, jsonError := json.Marshal(joinMessage)
- if jsonError != nil {
- log.Fatal("json marshaling error: ", jsonError)
- }
- log.Println("The json data size is", len(jsonData), string(jsonData))
-
- // Now test and newTest contain the same data.
- if *joinMessage.PublicUrl != *newMessage.PublicUrl {
- log.Fatalf("data mismatch %q != %q", *joinMessage.PublicUrl, *newMessage.PublicUrl)
- }
-}
diff --git a/weed/pb/Makefile b/weed/pb/Makefile
new file mode 100644
index 000000000..8d0eb7854
--- /dev/null
+++ b/weed/pb/Makefile
@@ -0,0 +1,6 @@
+all: gen
+
+.PHONY : gen
+
+gen:
+ protoc seaweed.proto --go_out=plugins=grpc:.
diff --git a/weed/pb/seaweed.pb.go b/weed/pb/seaweed.pb.go
new file mode 100644
index 000000000..02de2d8a6
--- /dev/null
+++ b/weed/pb/seaweed.pb.go
@@ -0,0 +1,384 @@
+// Code generated by protoc-gen-go.
+// source: seaweed.proto
+// DO NOT EDIT!
+
+/*
+Package pb is a generated protocol buffer package.
+
+It is generated from these files:
+ seaweed.proto
+
+It has these top-level messages:
+ Heartbeat
+ HeartbeatResponse
+ VolumeInformationMessage
+*/
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+import (
+ context "golang.org/x/net/context"
+ grpc "google.golang.org/grpc"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Heartbeat struct {
+ IsInit bool `protobuf:"varint,1,opt,name=is_init,json=isInit" json:"is_init,omitempty"`
+ Ip string `protobuf:"bytes,2,opt,name=ip" json:"ip,omitempty"`
+ Port uint32 `protobuf:"varint,3,opt,name=port" json:"port,omitempty"`
+ PublicUrl string `protobuf:"bytes,4,opt,name=public_url,json=publicUrl" json:"public_url,omitempty"`
+ MaxVolumeCount uint32 `protobuf:"varint,5,opt,name=max_volume_count,json=maxVolumeCount" json:"max_volume_count,omitempty"`
+ MaxFileKey uint64 `protobuf:"varint,6,opt,name=max_file_key,json=maxFileKey" json:"max_file_key,omitempty"`
+ DataCenter string `protobuf:"bytes,7,opt,name=data_center,json=dataCenter" json:"data_center,omitempty"`
+ Rack string `protobuf:"bytes,8,opt,name=rack" json:"rack,omitempty"`
+ Volumes []*VolumeInformationMessage `protobuf:"bytes,9,rep,name=volumes" json:"volumes,omitempty"`
+ AdminPort uint32 `protobuf:"varint,10,opt,name=admin_port,json=adminPort" json:"admin_port,omitempty"`
+}
+
+func (m *Heartbeat) Reset() { *m = Heartbeat{} }
+func (m *Heartbeat) String() string { return proto.CompactTextString(m) }
+func (*Heartbeat) ProtoMessage() {}
+func (*Heartbeat) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Heartbeat) GetIsInit() bool {
+ if m != nil {
+ return m.IsInit
+ }
+ return false
+}
+
+func (m *Heartbeat) GetIp() string {
+ if m != nil {
+ return m.Ip
+ }
+ return ""
+}
+
+func (m *Heartbeat) GetPort() uint32 {
+ if m != nil {
+ return m.Port
+ }
+ return 0
+}
+
+func (m *Heartbeat) GetPublicUrl() string {
+ if m != nil {
+ return m.PublicUrl
+ }
+ return ""
+}
+
+func (m *Heartbeat) GetMaxVolumeCount() uint32 {
+ if m != nil {
+ return m.MaxVolumeCount
+ }
+ return 0
+}
+
+func (m *Heartbeat) GetMaxFileKey() uint64 {
+ if m != nil {
+ return m.MaxFileKey
+ }
+ return 0
+}
+
+func (m *Heartbeat) GetDataCenter() string {
+ if m != nil {
+ return m.DataCenter
+ }
+ return ""
+}
+
+func (m *Heartbeat) GetRack() string {
+ if m != nil {
+ return m.Rack
+ }
+ return ""
+}
+
+func (m *Heartbeat) GetVolumes() []*VolumeInformationMessage {
+ if m != nil {
+ return m.Volumes
+ }
+ return nil
+}
+
+func (m *Heartbeat) GetAdminPort() uint32 {
+ if m != nil {
+ return m.AdminPort
+ }
+ return 0
+}
+
+type HeartbeatResponse struct {
+ VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volumeSizeLimit" json:"volumeSizeLimit,omitempty"`
+ SecretKey string `protobuf:"bytes,2,opt,name=secretKey" json:"secretKey,omitempty"`
+}
+
+func (m *HeartbeatResponse) Reset() { *m = HeartbeatResponse{} }
+func (m *HeartbeatResponse) String() string { return proto.CompactTextString(m) }
+func (*HeartbeatResponse) ProtoMessage() {}
+func (*HeartbeatResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *HeartbeatResponse) GetVolumeSizeLimit() uint64 {
+ if m != nil {
+ return m.VolumeSizeLimit
+ }
+ return 0
+}
+
+func (m *HeartbeatResponse) GetSecretKey() string {
+ if m != nil {
+ return m.SecretKey
+ }
+ return ""
+}
+
+type VolumeInformationMessage struct {
+ Id uint32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
+ Size uint64 `protobuf:"varint,2,opt,name=size" json:"size,omitempty"`
+ Collection string `protobuf:"bytes,3,opt,name=collection" json:"collection,omitempty"`
+ FileCount uint64 `protobuf:"varint,4,opt,name=file_count,json=fileCount" json:"file_count,omitempty"`
+ DeleteCount uint64 `protobuf:"varint,5,opt,name=delete_count,json=deleteCount" json:"delete_count,omitempty"`
+ DeletedByteCount uint64 `protobuf:"varint,6,opt,name=deleted_byte_count,json=deletedByteCount" json:"deleted_byte_count,omitempty"`
+ ReadOnly bool `protobuf:"varint,7,opt,name=read_only,json=readOnly" json:"read_only,omitempty"`
+ ReplicaPlacement uint32 `protobuf:"varint,8,opt,name=replica_placement,json=replicaPlacement" json:"replica_placement,omitempty"`
+ Version uint32 `protobuf:"varint,9,opt,name=version" json:"version,omitempty"`
+ Ttl uint32 `protobuf:"varint,10,opt,name=ttl" json:"ttl,omitempty"`
+}
+
+func (m *VolumeInformationMessage) Reset() { *m = VolumeInformationMessage{} }
+func (m *VolumeInformationMessage) String() string { return proto.CompactTextString(m) }
+func (*VolumeInformationMessage) ProtoMessage() {}
+func (*VolumeInformationMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+func (m *VolumeInformationMessage) GetId() uint32 {
+ if m != nil {
+ return m.Id
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetSize() uint64 {
+ if m != nil {
+ return m.Size
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetCollection() string {
+ if m != nil {
+ return m.Collection
+ }
+ return ""
+}
+
+func (m *VolumeInformationMessage) GetFileCount() uint64 {
+ if m != nil {
+ return m.FileCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetDeleteCount() uint64 {
+ if m != nil {
+ return m.DeleteCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetDeletedByteCount() uint64 {
+ if m != nil {
+ return m.DeletedByteCount
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetReadOnly() bool {
+ if m != nil {
+ return m.ReadOnly
+ }
+ return false
+}
+
+func (m *VolumeInformationMessage) GetReplicaPlacement() uint32 {
+ if m != nil {
+ return m.ReplicaPlacement
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetVersion() uint32 {
+ if m != nil {
+ return m.Version
+ }
+ return 0
+}
+
+func (m *VolumeInformationMessage) GetTtl() uint32 {
+ if m != nil {
+ return m.Ttl
+ }
+ return 0
+}
+
+func init() {
+ proto.RegisterType((*Heartbeat)(nil), "pb.Heartbeat")
+ proto.RegisterType((*HeartbeatResponse)(nil), "pb.HeartbeatResponse")
+ proto.RegisterType((*VolumeInformationMessage)(nil), "pb.VolumeInformationMessage")
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// Client API for Seaweed service
+
+type SeaweedClient interface {
+ SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error)
+}
+
+type seaweedClient struct {
+ cc *grpc.ClientConn
+}
+
+func NewSeaweedClient(cc *grpc.ClientConn) SeaweedClient {
+ return &seaweedClient{cc}
+}
+
+func (c *seaweedClient) SendHeartbeat(ctx context.Context, opts ...grpc.CallOption) (Seaweed_SendHeartbeatClient, error) {
+ stream, err := grpc.NewClientStream(ctx, &_Seaweed_serviceDesc.Streams[0], c.cc, "/pb.Seaweed/SendHeartbeat", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &seaweedSendHeartbeatClient{stream}
+ return x, nil
+}
+
+type Seaweed_SendHeartbeatClient interface {
+ Send(*Heartbeat) error
+ Recv() (*HeartbeatResponse, error)
+ grpc.ClientStream
+}
+
+type seaweedSendHeartbeatClient struct {
+ grpc.ClientStream
+}
+
+func (x *seaweedSendHeartbeatClient) Send(m *Heartbeat) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *seaweedSendHeartbeatClient) Recv() (*HeartbeatResponse, error) {
+ m := new(HeartbeatResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+// Server API for Seaweed service
+
+type SeaweedServer interface {
+ SendHeartbeat(Seaweed_SendHeartbeatServer) error
+}
+
+func RegisterSeaweedServer(s *grpc.Server, srv SeaweedServer) {
+ s.RegisterService(&_Seaweed_serviceDesc, srv)
+}
+
+func _Seaweed_SendHeartbeat_Handler(srv interface{}, stream grpc.ServerStream) error {
+ return srv.(SeaweedServer).SendHeartbeat(&seaweedSendHeartbeatServer{stream})
+}
+
+type Seaweed_SendHeartbeatServer interface {
+ Send(*HeartbeatResponse) error
+ Recv() (*Heartbeat, error)
+ grpc.ServerStream
+}
+
+type seaweedSendHeartbeatServer struct {
+ grpc.ServerStream
+}
+
+func (x *seaweedSendHeartbeatServer) Send(m *HeartbeatResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *seaweedSendHeartbeatServer) Recv() (*Heartbeat, error) {
+ m := new(Heartbeat)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+var _Seaweed_serviceDesc = grpc.ServiceDesc{
+ ServiceName: "pb.Seaweed",
+ HandlerType: (*SeaweedServer)(nil),
+ Methods: []grpc.MethodDesc{},
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "SendHeartbeat",
+ Handler: _Seaweed_SendHeartbeat_Handler,
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ },
+ Metadata: "seaweed.proto",
+}
+
+func init() { proto.RegisterFile("seaweed.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+ // 511 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x93, 0x41, 0x6f, 0xd3, 0x4c,
+ 0x10, 0x86, 0x3f, 0x3b, 0xfe, 0x92, 0x78, 0x52, 0x97, 0x74, 0x25, 0x84, 0x05, 0x05, 0x4c, 0x4e,
+ 0x96, 0x40, 0x11, 0x2a, 0x12, 0x17, 0x6e, 0x54, 0xaa, 0xa8, 0x0a, 0xa2, 0xda, 0x08, 0x2e, 0x1c,
+ 0xac, 0xb5, 0x3d, 0x45, 0xab, 0xae, 0xd7, 0xd6, 0x7a, 0x53, 0xe2, 0xfe, 0x39, 0x2e, 0xfc, 0x30,
+ 0xb4, 0xb3, 0x49, 0x5a, 0x90, 0xb8, 0xcd, 0x3c, 0xfb, 0x8e, 0x77, 0x67, 0xde, 0x31, 0x24, 0x3d,
+ 0x8a, 0x1f, 0x88, 0xf5, 0xb2, 0x33, 0xad, 0x6d, 0x59, 0xd8, 0x95, 0x8b, 0x9f, 0x21, 0xc4, 0x1f,
+ 0x50, 0x18, 0x5b, 0xa2, 0xb0, 0xec, 0x11, 0x4c, 0x64, 0x5f, 0x48, 0x2d, 0x6d, 0x1a, 0x64, 0x41,
+ 0x3e, 0xe5, 0x63, 0xd9, 0x9f, 0x6b, 0x69, 0xd9, 0x21, 0x84, 0xb2, 0x4b, 0xc3, 0x2c, 0xc8, 0x63,
+ 0x1e, 0xca, 0x8e, 0x31, 0x88, 0xba, 0xd6, 0xd8, 0x74, 0x94, 0x05, 0x79, 0xc2, 0x29, 0x66, 0x4f,
+ 0x01, 0xba, 0x75, 0xa9, 0x64, 0x55, 0xac, 0x8d, 0x4a, 0x23, 0xd2, 0xc6, 0x9e, 0x7c, 0x31, 0x8a,
+ 0xe5, 0x30, 0x6f, 0xc4, 0xa6, 0xb8, 0x69, 0xd5, 0xba, 0xc1, 0xa2, 0x6a, 0xd7, 0xda, 0xa6, 0xff,
+ 0x53, 0xf9, 0x61, 0x23, 0x36, 0x5f, 0x09, 0x9f, 0x3a, 0xca, 0x32, 0x38, 0x70, 0xca, 0x2b, 0xa9,
+ 0xb0, 0xb8, 0xc6, 0x21, 0x1d, 0x67, 0x41, 0x1e, 0x71, 0x68, 0xc4, 0xe6, 0x4c, 0x2a, 0xbc, 0xc0,
+ 0x81, 0x3d, 0x87, 0x59, 0x2d, 0xac, 0x28, 0x2a, 0xd4, 0x16, 0x4d, 0x3a, 0xa1, 0xbb, 0xc0, 0xa1,
+ 0x53, 0x22, 0xee, 0x7d, 0x46, 0x54, 0xd7, 0xe9, 0x94, 0x4e, 0x28, 0x66, 0x6f, 0x61, 0xe2, 0x2f,
+ 0xef, 0xd3, 0x38, 0x1b, 0xe5, 0xb3, 0x93, 0xe3, 0x65, 0x57, 0x2e, 0xfd, 0xc5, 0xe7, 0xfa, 0xaa,
+ 0x35, 0x8d, 0xb0, 0xb2, 0xd5, 0x9f, 0xb0, 0xef, 0xc5, 0x77, 0xe4, 0x3b, 0xb1, 0xeb, 0x4b, 0xd4,
+ 0x8d, 0xd4, 0x05, 0x75, 0x0c, 0xf4, 0xe4, 0x98, 0xc8, 0x65, 0x6b, 0xec, 0xe2, 0x1b, 0x1c, 0xed,
+ 0x07, 0xc8, 0xb1, 0xef, 0x5a, 0xdd, 0x23, 0xcb, 0xe1, 0x81, 0x2f, 0x5f, 0xc9, 0x5b, 0xfc, 0x28,
+ 0x9b, 0xed, 0x40, 0x23, 0xfe, 0x37, 0x66, 0xc7, 0x10, 0xf7, 0x58, 0x19, 0xb4, 0x17, 0x38, 0x6c,
+ 0x07, 0x7c, 0x07, 0x16, 0xbf, 0x42, 0x48, 0xff, 0xf5, 0x42, 0x32, 0xa5, 0xa6, 0xef, 0x26, 0x3c,
+ 0x94, 0xb5, 0x6b, 0xba, 0x97, 0xb7, 0x48, 0x5f, 0x89, 0x38, 0xc5, 0xec, 0x19, 0x40, 0xd5, 0x2a,
+ 0x85, 0x95, 0x2b, 0x24, 0xbb, 0x62, 0x7e, 0x8f, 0xb8, 0xe6, 0x68, 0xce, 0xde, 0x8f, 0x88, 0x2a,
+ 0x63, 0x47, 0xbc, 0x15, 0x2f, 0xe0, 0xa0, 0x46, 0x85, 0xf6, 0xbe, 0x61, 0x11, 0x9f, 0x79, 0xe6,
+ 0x25, 0xaf, 0x80, 0xf9, 0xb4, 0x2e, 0xca, 0x61, 0x2f, 0xf4, 0x9e, 0xcd, 0xb7, 0x27, 0xef, 0x87,
+ 0x9d, 0xfa, 0x09, 0xc4, 0x06, 0x45, 0x5d, 0xb4, 0x5a, 0x0d, 0xe4, 0xdb, 0x94, 0x4f, 0x1d, 0xf8,
+ 0xac, 0xd5, 0xc0, 0x5e, 0xc2, 0x91, 0xc1, 0x4e, 0xc9, 0x4a, 0x14, 0x9d, 0x12, 0x15, 0x36, 0xa8,
+ 0x2d, 0x59, 0x98, 0xf0, 0xf9, 0xf6, 0xe0, 0x72, 0xc7, 0x59, 0x0a, 0x93, 0x1b, 0x34, 0xbd, 0x6b,
+ 0x2b, 0x26, 0xc9, 0x2e, 0x65, 0x73, 0x18, 0x59, 0xab, 0xb6, 0x4e, 0xb9, 0xf0, 0xe4, 0x0c, 0x26,
+ 0x2b, 0xbf, 0xfa, 0xec, 0x1d, 0x24, 0x2b, 0xd4, 0xf5, 0xdd, 0xce, 0x27, 0x6e, 0x0b, 0xf6, 0xe9,
+ 0xe3, 0x87, 0x7f, 0xa4, 0x3b, 0x43, 0x17, 0xff, 0xe5, 0xc1, 0xeb, 0xa0, 0x1c, 0xd3, 0x8f, 0xf3,
+ 0xe6, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x7d, 0xc1, 0xd3, 0x35, 0x49, 0x03, 0x00, 0x00,
+}
diff --git a/weed/pb/seaweed.proto b/weed/pb/seaweed.proto
new file mode 100644
index 000000000..2dc8343a2
--- /dev/null
+++ b/weed/pb/seaweed.proto
@@ -0,0 +1,41 @@
+syntax = "proto3";
+
+package pb;
+
+//////////////////////////////////////////////////
+
+service Seaweed {
+ rpc SendHeartbeat(stream Heartbeat) returns (stream HeartbeatResponse) {}
+}
+
+//////////////////////////////////////////////////
+
+message Heartbeat {
+ bool is_init = 1;
+ string ip = 2;
+ uint32 port = 3;
+ string public_url = 4;
+ uint32 max_volume_count = 5;
+ uint64 max_file_key = 6;
+ string data_center = 7;
+ string rack = 8;
+ repeated VolumeInformationMessage volumes = 9;
+ uint32 admin_port = 10;
+}
+message HeartbeatResponse {
+ uint64 volumeSizeLimit = 1;
+ string secretKey = 2;
+}
+
+message VolumeInformationMessage {
+ uint32 id = 1;
+ uint64 size = 2;
+ string collection = 3;
+ uint64 file_count = 4;
+ uint64 delete_count = 5;
+ uint64 deleted_byte_count = 6;
+ bool read_only = 7;
+ uint32 replica_placement = 8;
+ uint32 version = 9;
+ uint32 ttl = 10;
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
new file mode 100644
index 000000000..29c95a3d4
--- /dev/null
+++ b/weed/server/master_grpc_server.go
@@ -0,0 +1,57 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/topology"
+)
+
+func (ms MasterServer) SendHeartbeat(stream pb.Seaweed_SendHeartbeatServer) error {
+ var dn *topology.DataNode
+ t := ms.Topo
+ for {
+ heartbeat, err := stream.Recv()
+ if err == nil {
+ if dn == nil {
+ t.Sequence.SetMax(heartbeat.MaxFileKey)
+ dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip,
+ int(heartbeat.Port), heartbeat.PublicUrl,
+ int(heartbeat.MaxVolumeCount))
+ glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
+ if err := stream.Send(&pb.HeartbeatResponse{
+ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
+ SecretKey: string(ms.guard.SecretKey),
+ }); err != nil {
+ return err
+ }
+ }
+
+ var volumeInfos []storage.VolumeInfo
+ for _, v := range heartbeat.Volumes {
+ if vi, err := storage.NewVolumeInfo(v); err == nil {
+ volumeInfos = append(volumeInfos, vi)
+ } else {
+ glog.V(0).Infof("Fail to convert joined volume information: %v", err)
+ }
+ }
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
+ for _, v := range volumeInfos {
+ t.RegisterVolumeLayout(v, dn)
+ }
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
+
+ } else {
+ glog.V(0).Infof("lost volume server %s:%d", dn.Ip, dn.Port)
+ if dn != nil {
+ t.UnRegisterDataNode(dn)
+ }
+ return err
+ }
+ }
+}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 9f59c2400..f02cb2790 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -72,7 +72,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
- r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler)))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler)))
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index efe81bf89..b15125576 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -1,21 +1,16 @@
package weed_server
import (
- "encoding/json"
"errors"
"fmt"
- "io/ioutil"
"math/rand"
"net/http"
"strconv"
- "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@@ -34,37 +29,6 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
ms.Topo.DeleteCollection(r.FormValue("collection"))
}
-func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
- body, err := ioutil.ReadAll(r.Body)
- if err != nil {
- writeJsonError(w, r, http.StatusBadRequest, err)
- return
- }
- joinMessage := &operation.JoinMessage{}
- if err = proto.Unmarshal(body, joinMessage); err != nil {
- writeJsonError(w, r, http.StatusBadRequest, err)
- return
- }
- if *joinMessage.Ip == "" {
- *joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")]
- }
- if glog.V(4) {
- if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
- glog.V(0).Infoln("json marshaling error: ", jsonError)
- writeJsonError(w, r, http.StatusBadRequest, jsonError)
- return
- } else {
- glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
- }
- }
-
- ms.Topo.ProcessJoinMessage(joinMessage)
- writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
- VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
- SecretKey: string(ms.guard.SecretKey),
- })
-}
-
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go
new file mode 100644
index 000000000..54e2c2f75
--- /dev/null
+++ b/weed/server/volume_grpc_client.go
@@ -0,0 +1,74 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+)
+
+func (vs *VolumeServer) heartbeat() {
+
+ glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode())
+ vs.masterNodes = storage.NewMasterNodes(vs.masterNode)
+ vs.store.SetDataCenter(vs.dataCenter)
+ vs.store.SetRack(vs.rack)
+
+ for {
+ err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second)
+ if err != nil {
+ glog.V(0).Infof("heartbeat error: %v", err)
+ time.Sleep(time.Duration(3*vs.pulseSeconds) * time.Second)
+ }
+ }
+}
+
+func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
+
+ masterNode, err := vs.masterNodes.FindMaster()
+ if err != nil {
+ return fmt.Errorf("No master found: %v", err)
+ }
+
+ grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("fail to dial: %v", err)
+ }
+ defer grpcConection.Close()
+
+ client := pb.NewSeaweedClient(grpcConection)
+ stream, err := client.SendHeartbeat(context.Background())
+ if err != nil {
+ glog.V(0).Infof("%v.SendHeartbeat(_) = _, %v", client, err)
+ return err
+ }
+ vs.SetMasterNode(masterNode)
+ glog.V(0).Infof("Heartbeat to %s", masterNode)
+
+ vs.store.Client = stream
+ defer func() { vs.store.Client = nil }()
+
+ go func() {
+ for {
+ in, err := stream.Recv()
+ if err != nil {
+ return
+ }
+ vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
+ vs.guard.SecretKey = security.Secret(in.GetSecretKey())
+ }
+ }()
+
+ for {
+ if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return err
+ }
+ time.Sleep(sleepInterval)
+ }
+}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 1a912a169..e86c33bda 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,10 +1,8 @@
package weed_server
import (
- "math/rand"
"net/http"
"sync"
- "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -19,6 +17,7 @@ type VolumeServer struct {
rack string
store *storage.Store
guard *security.Guard
+ masterNodes *storage.MasterNodes
needleMapKind storage.NeedleMapType
FixJpgOrientation bool
@@ -70,36 +69,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
}
- go func() {
- connected := true
-
- glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode())
- vs.store.SetBootstrapMaster(vs.GetMasterNode())
- vs.store.SetDataCenter(vs.dataCenter)
- vs.store.SetRack(vs.rack)
- for {
- glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode())
- master, secretKey, err := vs.store.SendHeartbeatToMaster()
- if err == nil {
- if !connected {
- connected = true
- vs.SetMasterNode(master)
- vs.guard.SecretKey = secretKey
- glog.V(0).Infoln("Volume Server Connected with master at", master)
- }
- } else {
- glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err)
- if connected {
- connected = false
- }
- }
- if connected {
- time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
- } else {
- time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond)
- }
- }
- }()
+ go vs.heartbeat()
return vs
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index be2044d64..c62ac9ab7 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -1,7 +1,6 @@
package storage
import (
- "encoding/json"
"errors"
"fmt"
"math/rand"
@@ -10,9 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/pb"
)
const (
@@ -76,12 +73,12 @@ type Store struct {
dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists
connected bool
- volumeSizeLimit uint64 //read from the master
- masterNodes *MasterNodes
+ VolumeSizeLimit uint64 //read from the master
+ Client pb.Seaweed_SendHeartbeatClient
}
func (s *Store) String() (str string) {
- str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d, masterNodes:%s", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.volumeSizeLimit, s.masterNodes)
+ str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.VolumeSizeLimit)
return
}
@@ -208,15 +205,8 @@ func (s *Store) SetRack(rack string) {
s.rack = rack
}
-func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
- s.masterNodes = NewMasterNodes(bootstrapMaster)
-}
-func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.Secret, e error) {
- masterNode, e = s.masterNodes.FindMaster()
- if e != nil {
- return
- }
- var volumeMessages []*operation.VolumeInformationMessage
+func (s *Store) CollectHeartbeat() *pb.Heartbeat {
+ var volumeMessages []*pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey uint64
for _, location := range s.Locations {
@@ -226,18 +216,18 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
}
- if !v.expired(s.volumeSizeLimit) {
- volumeMessage := &operation.VolumeInformationMessage{
- Id: proto.Uint32(uint32(k)),
- Size: proto.Uint64(uint64(v.Size())),
- Collection: proto.String(v.Collection),
- FileCount: proto.Uint64(uint64(v.nm.FileCount())),
- DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())),
- DeletedByteCount: proto.Uint64(v.nm.DeletedSize()),
- ReadOnly: proto.Bool(v.readOnly),
- ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())),
- Version: proto.Uint32(uint32(v.Version())),
- Ttl: proto.Uint32(v.Ttl.ToUint32()),
+ if !v.expired(s.VolumeSizeLimit) {
+ volumeMessage := &pb.VolumeInformationMessage{
+ Id: uint32(k),
+ Size: uint64(v.Size()),
+ Collection: v.Collection,
+ FileCount: uint64(v.nm.FileCount()),
+ DeleteCount: uint64(v.nm.DeletedCount()),
+ DeletedByteCount: v.nm.DeletedSize(),
+ ReadOnly: v.readOnly,
+ ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
+ Version: uint32(v.Version()),
+ Ttl: v.Ttl.ToUint32(),
}
volumeMessages = append(volumeMessages, volumeMessage)
} else {
@@ -252,45 +242,17 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
location.Unlock()
}
- joinMessage := &operation.JoinMessage{
- IsInit: proto.Bool(!s.connected),
- Ip: proto.String(s.Ip),
- Port: proto.Uint32(uint32(s.Port)),
- PublicUrl: proto.String(s.PublicUrl),
- MaxVolumeCount: proto.Uint32(uint32(maxVolumeCount)),
- MaxFileKey: proto.Uint64(maxFileKey),
- DataCenter: proto.String(s.dataCenter),
- Rack: proto.String(s.rack),
+ return &pb.Heartbeat{
+ Ip: s.Ip,
+ Port: uint32(s.Port),
+ PublicUrl: s.PublicUrl,
+ MaxVolumeCount: uint32(maxVolumeCount),
+ MaxFileKey: maxFileKey,
+ DataCenter: s.dataCenter,
+ Rack: s.rack,
Volumes: volumeMessages,
}
- data, err := proto.Marshal(joinMessage)
- if err != nil {
- return "", "", err
- }
-
- joinUrl := "http://" + masterNode + "/dir/join"
- glog.V(4).Infof("Connecting to %s ...", joinUrl)
-
- jsonBlob, err := util.PostBytes(joinUrl, data)
- if err != nil {
- s.masterNodes.Reset()
- return "", "", err
- }
- var ret operation.JoinResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- glog.V(0).Infof("Failed to join %s with response: %s", joinUrl, string(jsonBlob))
- s.masterNodes.Reset()
- return masterNode, "", err
- }
- if ret.Error != "" {
- s.masterNodes.Reset()
- return masterNode, "", errors.New(ret.Error)
- }
- s.volumeSizeLimit = ret.VolumeSizeLimit
- secretKey = security.Secret(ret.SecretKey)
- s.connected = true
- return
}
func (s *Store) Close() {
for _, location := range s.Locations {
@@ -307,12 +269,14 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
size, err = v.writeNeedle(n)
} else {
- err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.volumeSizeLimit, v.ContentSize())
+ err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.VolumeSizeLimit, v.ContentSize())
}
- if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
- glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
- if _, _, e := s.SendHeartbeatToMaster(); e != nil {
- glog.V(0).Infoln("error when reporting size:", e)
+ if s.VolumeSizeLimit < v.ContentSize()+3*uint64(size) {
+ glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.VolumeSizeLimit)
+ if s.Client != nil {
+ if e := s.Client.Send(s.CollectHeartbeat()); e != nil {
+ glog.V(0).Infoln("error when reporting size:", e)
+ }
}
}
return
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
index b3068eec3..c73c27fe4 100644
--- a/weed/storage/volume_info.go
+++ b/weed/storage/volume_info.go
@@ -2,8 +2,9 @@ package storage
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/operation"
"sort"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
)
type VolumeInfo struct {
@@ -19,23 +20,23 @@ type VolumeInfo struct {
ReadOnly bool
}
-func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err error) {
+func NewVolumeInfo(m *pb.VolumeInformationMessage) (vi VolumeInfo, err error) {
vi = VolumeInfo{
- Id: VolumeId(*m.Id),
- Size: *m.Size,
- Collection: *m.Collection,
- FileCount: int(*m.FileCount),
- DeleteCount: int(*m.DeleteCount),
- DeletedByteCount: *m.DeletedByteCount,
- ReadOnly: *m.ReadOnly,
- Version: Version(*m.Version),
+ Id: VolumeId(m.Id),
+ Size: m.Size,
+ Collection: m.Collection,
+ FileCount: int(m.FileCount),
+ DeleteCount: int(m.DeleteCount),
+ DeletedByteCount: m.DeletedByteCount,
+ ReadOnly: m.ReadOnly,
+ Version: Version(m.Version),
}
- rp, e := NewReplicaPlacementFromByte(byte(*m.ReplicaPlacement))
+ rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
vi.ReplicaPlacement = rp
- vi.Ttl = LoadTTLFromUint32(*m.Ttl)
+ vi.Ttl = LoadTTLFromUint32(m.Ttl)
return vi, nil
}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index b7f039559..0ef8ae14e 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -15,7 +15,6 @@ type DataNode struct {
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
- Dead bool
}
func NewDataNode(id string) *DataNode {
@@ -30,7 +29,7 @@ func NewDataNode(id string) *DataNode {
func (dn *DataNode) String() string {
dn.RLock()
defer dn.RUnlock()
- return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
+ return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
diff --git a/weed/topology/node.go b/weed/topology/node.go
index 4ce35f4b0..7383f9576 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -242,12 +242,6 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
- if dn.LastSeen < freshThreshHold {
- if !dn.Dead {
- dn.Dead = true
- n.GetTopology().chanDeadDataNodes <- dn
- }
- }
for _, v := range dn.GetVolumes() {
if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index 1ca2f8de8..a48d64323 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -32,11 +32,6 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix()
- if dn.Dead {
- dn.Dead = false
- r.GetTopology().chanRecoveredDataNodes <- dn
- dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
- }
return dn
}
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 04b500053..ffd32ae21 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -24,11 +23,9 @@ type Topology struct {
Sequence sequence.Sequencer
- chanDeadDataNodes chan *DataNode
- chanRecoveredDataNodes chan *DataNode
- chanFullVolumes chan storage.VolumeInfo
+ chanFullVolumes chan storage.VolumeInfo
- configuration *Configuration
+ Configuration *Configuration
RaftServer raft.Server
}
@@ -45,8 +42,6 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.Sequence = seq
- t.chanDeadDataNodes = make(chan *DataNode)
- t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan storage.VolumeInfo)
err := t.loadConfiguration(confFile)
@@ -80,7 +75,7 @@ func (t *Topology) Leader() (string, error) {
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
- t.configuration, e = NewConfiguration(b)
+ t.Configuration, e = NewConfiguration(b)
return e
}
glog.V(0).Infoln("Using default configurations.")
@@ -147,35 +142,6 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
- t.Sequence.SetMax(*joinMessage.MaxFileKey)
- dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
- if *joinMessage.IsInit && dn != nil {
- t.UnRegisterDataNode(dn)
- }
- dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
- int(*joinMessage.Port), *joinMessage.PublicUrl,
- int(*joinMessage.MaxVolumeCount))
- var volumeInfos []storage.VolumeInfo
- for _, v := range joinMessage.Volumes {
- if vi, err := storage.NewVolumeInfo(v); err == nil {
- volumeInfos = append(volumeInfos, vi)
- } else {
- glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
- }
- }
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
- t.RegisterVolumeLayout(v, dn)
- }
- for _, v := range deletedVolumes {
- t.UnRegisterVolumeLayout(v, dn)
- }
-}
-
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 476aaf4d8..40019fdcd 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -31,12 +31,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
select {
case v := <-t.chanFullVolumes:
t.SetVolumeCapacityFull(v)
- case dn := <-t.chanRecoveredDataNodes:
- t.RegisterRecoveredDataNode(dn)
- glog.V(0).Infoln("Recovered DataNode: %v", dn)
- case dn := <-t.chanDeadDataNodes:
- t.UnRegisterDataNode(dn)
- glog.V(0).Infof("Dead DataNode: %v", dn)
}
}
}()
@@ -64,11 +58,3 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id())
}
-func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
- for _, v := range dn.GetVolumes() {
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
- if vl.isWritable(&v) {
- vl.SetVolumeAvailable(dn, v.Id)
- }
- }
-}