aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/msg_broker_grpc_server.go23
-rw-r--r--weed/server/msg_broker_server.go121
-rw-r--r--weed/server/queue_server.go49
-rw-r--r--weed/server/raft_server.go11
-rw-r--r--weed/server/volume_grpc_client_to_master.go5
-rw-r--r--weed/server/webdav_server.go3
6 files changed, 156 insertions, 56 deletions
diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go
new file mode 100644
index 000000000..8b13aac76
--- /dev/null
+++ b/weed/server/msg_broker_grpc_server.go
@@ -0,0 +1,23 @@
+package weed_server
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+)
+
+func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
+ panic("implement me")
+}
diff --git a/weed/server/msg_broker_server.go b/weed/server/msg_broker_server.go
new file mode 100644
index 000000000..a9d908581
--- /dev/null
+++ b/weed/server/msg_broker_server.go
@@ -0,0 +1,121 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessageBrokerOption struct {
+ Filers []string
+ DefaultReplication string
+ MaxMB int
+ Port int
+}
+
+type MessageBroker struct {
+ option *MessageBrokerOption
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) {
+
+ messageBroker = &MessageBroker{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"),
+ }
+
+ go messageBroker.loopForEver()
+
+ return messageBroker, nil
+}
+
+func (broker *MessageBroker) loopForEver() {
+
+ for {
+ broker.checkPeers()
+ time.Sleep(3 * time.Second)
+ }
+
+}
+
+func (broker *MessageBroker) checkPeers() {
+
+ // contact a filer about masters
+ var masters []string
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+ // contact each masters for filers
+ var filers []string
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to list filers: %v\n", err)
+ return
+ }
+ }
+
+ // contact each filer about brokers
+ for _, filer := range filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+}
+
+func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+
+}
+
+func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return fn(client)
+ })
+
+}
diff --git a/weed/server/queue_server.go b/weed/server/queue_server.go
deleted file mode 100644
index 078c76a30..000000000
--- a/weed/server/queue_server.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package weed_server
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type QueueServerOption struct {
- Filers []string
- DefaultReplication string
- MaxMB int
- Port int
-}
-
-type QueueServer struct {
- option *QueueServerOption
- grpcDialOption grpc.DialOption
-}
-
-func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
- panic("implement me")
-}
-
-func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
- panic("implement me")
-}
-
-func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
- panic("implement me")
-}
-
-func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
- panic("implement me")
-}
-
-func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) {
-
- qs = &QueueServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"),
- }
-
- return qs, nil
-}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 53289f1c1..0381c7feb 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,8 +2,6 @@ package weed_server
import (
"encoding/json"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
"io/ioutil"
"os"
"path"
@@ -11,7 +9,12 @@ import (
"sort"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -61,7 +64,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
s.raftServer.Start()
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer))
+ s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
@@ -72,7 +75,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: util.ServerToGrpcAddress(s.serverAddr),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
})
if err != nil {
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 2168afee7..1f4d9df10 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -36,7 +37,7 @@ func (vs *VolumeServer) heartbeat() {
if newLeader != "" {
master = newLeader
}
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
if parseErr != nil {
glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
continue
@@ -55,7 +56,7 @@ func (vs *VolumeServer) heartbeat() {
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+ grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index ddd611724..a07f6be01 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -14,6 +14,7 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -98,7 +99,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)