aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-12 02:00:54 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:24:38 -0700
commit74f60f246fadee2244b78c0c68259a759be55490 (patch)
tree4b27b3ece34565cf00a0ccf62922016f8bbd4f4d /weed
parent9f479aab98e6d8b02026d935af1f614ee8b0b403 (diff)
downloadseaweedfs-74f60f246fadee2244b78c0c68259a759be55490.tar.xz
seaweedfs-74f60f246fadee2244b78c0c68259a759be55490.zip
dynamically connect to a filer
Diffstat (limited to 'weed')
-rw-r--r--weed/cluster/master_client.go34
-rw-r--r--weed/command/mq_broker.go30
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/filer/filer.go23
-rw-r--r--weed/mq/broker/broker_grpc_server_discovery.go72
-rw-r--r--weed/mq/broker/broker_server.go40
6 files changed, 72 insertions, 128 deletions
diff --git a/weed/cluster/master_client.go b/weed/cluster/master_client.go
new file mode 100644
index 000000000..b9b1673f3
--- /dev/null
+++ b/weed/cluster/master_client.go
@@ -0,0 +1,34 @@
+package cluster
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "google.golang.org/grpc"
+)
+
+func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) {
+
+ if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
+ ClientType: clientType,
+ FilerGroup: filerGroup,
+ })
+
+ glog.V(0).Infof("the cluster has %d %s\n", len(resp.ClusterNodes), clientType)
+ for _, node := range resp.ClusterNodes {
+ existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
+ NodeType: FilerType,
+ Address: node.Address,
+ IsLeader: node.IsLeader,
+ IsAdd: true,
+ CreatedAtNs: node.CreatedAtNs,
+ })
+ }
+ return err
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", master, grpcErr)
+ }
+ return
+}
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go
index cce77bf8e..1b31d0141 100644
--- a/weed/command/mq_broker.go
+++ b/weed/command/mq_broker.go
@@ -1,10 +1,6 @@
package command
import (
- "context"
- "fmt"
- "time"
-
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/util/grace"
@@ -12,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mq/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -26,7 +21,6 @@ type MessageQueueBrokerOptions struct {
masters map[string]pb.ServerAddress
mastersString *string
filerGroup *string
- filer *string
ip *string
port *int
dataCenter *string
@@ -38,7 +32,6 @@ type MessageQueueBrokerOptions struct {
func init() {
cmdMqBroker.Run = runMqBroker // break init cycle
mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers")
- mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address")
mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
@@ -73,40 +66,17 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
- filerAddress := pb.ServerAddress(*mqBrokerOpt.filer)
-
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
- cipher := false
-
- for {
- err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
- }
- cipher = resp.Cipher
- return nil
- })
- if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
- time.Sleep(time.Second)
- } else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
- break
- }
- }
qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{
Masters: mqBrokerOpt.masters,
FilerGroup: *mqBrokerOpt.filerGroup,
DataCenter: *mqBrokerOpt.dataCenter,
Rack: *mqBrokerOpt.rack,
- Filers: []pb.ServerAddress{filerAddress},
DefaultReplication: "",
MaxMB: 0,
Ip: *mqBrokerOpt.ip,
Port: *mqBrokerOpt.port,
- Cipher: cipher,
}, grpcDialOption)
// start grpc listener
diff --git a/weed/command/server.go b/weed/command/server.go
index 04b07ed51..b993d9428 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -228,7 +228,6 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress
iamOptions.filer = &filerAddress
webdavOptions.filer = &filerAddress
- mqBrokerOptions.filer = &filerAddress
mqBrokerOptions.filerGroup = filerOptions.filerGroup
go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index b0df3f618..fcb22fec6 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -105,28 +105,7 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
}
func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
-
- if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.FilerType,
- FilerGroup: f.MasterClient.FilerGroup,
- })
-
- glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
- for _, node := range resp.ClusterNodes {
- existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
- NodeType: cluster.FilerType,
- Address: node.Address,
- IsLeader: node.IsLeader,
- IsAdd: true,
- CreatedAtNs: node.CreatedAtNs,
- })
- }
- return err
- }); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
- }
- return
+ return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
}
func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
diff --git a/weed/mq/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go
deleted file mode 100644
index 94e89cd41..000000000
--- a/weed/mq/broker/broker_grpc_server_discovery.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package broker
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/cluster"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
-)
-
-func (broker *MessageQueueBroker) checkFilers() {
-
- // contact a filer about masters
- var masters []pb.ServerAddress
- found := false
- for !found {
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return err
- }
- for _, m := range resp.Masters {
- masters = append(masters, pb.ServerAddress(m))
- }
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []pb.ServerAddress
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.FilerType,
- })
- if err != nil {
- return err
- }
-
- for _, clusterNode := range resp.ClusterNodes {
- filers = append(filers, pb.ServerAddress(clusterNode.Address))
- }
-
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
-
-}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 64ea7c666..f940b00c3 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
+ "time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -16,7 +17,6 @@ type MessageQueueBrokerOption struct {
FilerGroup string
DataCenter string
Rack string
- Filers []pb.ServerAddress
DefaultReplication string
MaxMB int
Ip string
@@ -29,6 +29,8 @@ type MessageQueueBroker struct {
option *MessageQueueBrokerOption
grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient
+ filers map[pb.ServerAddress]struct{}
+ currentFiler pb.ServerAddress
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -37,15 +39,47 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
option: option,
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
+ filers: make(map[pb.ServerAddress]struct{}),
}
-
- mqBroker.checkFilers()
+ mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate
go mqBroker.MasterClient.KeepConnectedToMaster()
+ existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
+ for _, newNode := range existingNodes {
+ mqBroker.OnBrokerUpdate(newNode, time.Now())
+ }
+
return mqBroker, nil
}
+func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ if update.NodeType != cluster.FilerType {
+ return
+ }
+
+ address := pb.ServerAddress(update.Address)
+ if update.IsAdd {
+ broker.filers[address] = struct{}{}
+ if broker.currentFiler == "" {
+ broker.currentFiler = address
+ }
+ } else {
+ delete(broker.filers, address)
+ if broker.currentFiler == address {
+ for filer, _ := range broker.filers {
+ broker.currentFiler = filer
+ break
+ }
+ }
+ }
+
+}
+
+func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
+ return broker.currentFiler
+}
+
func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)