aboutsummaryrefslogtreecommitdiff
path: root/weed/server/msg_broker_server.go
diff options
context:
space:
mode:
authorHongyanShen <763987993@qq.com>2020-03-11 12:55:24 +0800
committerGitHub <noreply@github.com>2020-03-11 12:55:24 +0800
commit03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch)
treeed8833386a712c850dcef0815509774681a6ab56 /weed/server/msg_broker_server.go
parent0fca1ae776783b37481549df40f477b7d9248d3c (diff)
parent60f5f05c78a2918d5219c925cea5847759281a2c (diff)
downloadseaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz
seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/server/msg_broker_server.go')
-rw-r--r--weed/server/msg_broker_server.go121
1 files changed, 121 insertions, 0 deletions
diff --git a/weed/server/msg_broker_server.go b/weed/server/msg_broker_server.go
new file mode 100644
index 000000000..a9d908581
--- /dev/null
+++ b/weed/server/msg_broker_server.go
@@ -0,0 +1,121 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessageBrokerOption struct {
+ Filers []string
+ DefaultReplication string
+ MaxMB int
+ Port int
+}
+
+type MessageBroker struct {
+ option *MessageBrokerOption
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) {
+
+ messageBroker = &MessageBroker{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"),
+ }
+
+ go messageBroker.loopForEver()
+
+ return messageBroker, nil
+}
+
+func (broker *MessageBroker) loopForEver() {
+
+ for {
+ broker.checkPeers()
+ time.Sleep(3 * time.Second)
+ }
+
+}
+
+func (broker *MessageBroker) checkPeers() {
+
+ // contact a filer about masters
+ var masters []string
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+ // contact each masters for filers
+ var filers []string
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to list filers: %v\n", err)
+ return
+ }
+ }
+
+ // contact each filer about brokers
+ for _, filer := range filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+}
+
+func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+
+}
+
+func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return fn(client)
+ })
+
+}