aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-18 15:17:27 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-18 15:17:27 -0700
commit076c8bd3bcb6f76c84a8df50aff923d493a6bb9d (patch)
treec04613e01216887c2efef3452d217e621832a04b /weed/messaging
parent5d346d44bdec30d632840fb30c520cc2a334e004 (diff)
downloadseaweedfs-076c8bd3bcb6f76c84a8df50aff923d493a6bb9d.tar.xz
seaweedfs-076c8bd3bcb6f76c84a8df50aff923d493a6bb9d.zip
filer master start up with default ip address instead of just localhost
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go15
-rw-r--r--weed/messaging/broker/broker_server.go85
-rw-r--r--weed/messaging/client/client.go27
-rw-r--r--weed/messaging/client/publisher.go72
4 files changed, 149 insertions, 50 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 20e6eb04b..6208b1435 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -27,6 +27,19 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
topicConfig := &messaging_pb.TopicConfiguration{
}
+
+ // send init response
+ initResponse := &messaging_pb.PublishResponse{
+ Config: nil,
+ Redirect: nil,
+ }
+ err = stream.Send(initResponse)
+ if err != nil {
+ return err
+ }
+ if initResponse.Redirect != nil {
+ return nil
+ }
// get lock
tp := TopicPartition{
@@ -87,6 +100,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
Headers: in.Data.Headers,
}
+ println("received message:", string(in.Data.Value))
+
data, err := proto.Marshal(m)
if err != nil {
glog.Errorf("marshall error: %v\n", err)
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 0522eb4b7..158a84e6c 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -2,11 +2,11 @@ package broker
import (
"context"
- "fmt"
"time"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -34,7 +34,9 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
topicLocks: NewTopicLocks(),
}
- go messageBroker.loopForEver()
+ messageBroker.checkPeers()
+
+ // go messageBroker.loopForEver()
return messageBroker, nil
}
@@ -52,58 +54,55 @@ func (broker *MessageBroker) checkPeers() {
// contact a filer about masters
var masters []string
- for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return err
+ found := false
+ for !found {
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
}
- masters = append(masters, resp.Masters...)
- return nil
- })
- if err != nil {
- fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
- return
+ glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
+ time.Sleep(time.Second)
}
}
+ glog.V(0).Infof("received master list: %s", masters)
// contact each masters for filers
var filers []string
- for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
- ClientType: "filer",
+ found = false
+ for !found {
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
})
- if err != nil {
- return err
+ if err == nil {
+ found = true
+ break
}
-
- fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
- filers = append(filers, resp.GrpcAddresses...)
-
- return nil
- })
- if err != nil {
- fmt.Printf("failed to list filers: %v\n", err)
- return
+ glog.V(0).Infof("failed to list filers: %v", err)
+ time.Sleep(time.Second)
}
}
+ glog.V(0).Infof("received filer list: %s", filers)
- // contact each filer about brokers
- for _, filer := range filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return err
- }
- masters = append(masters, resp.Masters...)
- return nil
- })
- if err != nil {
- fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
- return
- }
- }
+ broker.option.Filers = filers
}
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go
index 9bf9bc71e..3f6d1ca53 100644
--- a/weed/messaging/client/client.go
+++ b/weed/messaging/client/client.go
@@ -1,11 +1,34 @@
package client
+import (
+ "context"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
type MessagingClient struct {
bootstrapBrokers []string
+ grpcConnection *grpc.ClientConn
}
-func NewMessagingClient(bootstrapBrokers []string) *MessagingClient {
+func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
+
+ grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
+ if err != nil {
+ return nil, err
+ }
+
return &MessagingClient{
bootstrapBrokers: bootstrapBrokers,
- }
+ grpcConnection: grpcConnection,
+ }, nil
+}
+
+func (mc *MessagingClient) Shutdown() {
+ mc.grpcConnection.Close()
}
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go
index 3e21cc557..d4c0f798a 100644
--- a/weed/messaging/client/publisher.go
+++ b/weed/messaging/client/publisher.go
@@ -1,14 +1,76 @@
package client
-import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
type Publisher struct {
+ publishClient messaging_pb.SeaweedMessaging_PublishClient
+}
+
+func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) {
+
+ stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.PublishRequest{
+ Init: &messaging_pb.PublishRequest_InitMessage{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: 0,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ if initResponse.Config != nil {
+ }
+
+ // setup looks for control messages
+ doneChan := make(chan error, 1)
+ go func() {
+ for {
+ in, err := stream.Recv()
+ if err != nil {
+ doneChan <- err
+ return
+ }
+ if in.Redirect != nil{
+ }
+ if in.Config != nil{
+ }
+ }
+ }()
+
+ return &Publisher{
+ publishClient: stream,
+ }, nil
}
-func (c *MessagingClient) NewPublisher(namespace, topic string) *Publisher {
- return &Publisher{}
+func (p *Publisher) Publish(m *messaging_pb.RawData) error {
+
+ return p.publishClient.Send(&messaging_pb.PublishRequest{
+ Data: m,
+ })
+
}
-func (p *Publisher) Publish(m *messaging_pb.RawData) error{
- return nil
+func (p *Publisher) Shutdown() {
+
+ p.publishClient.CloseSend()
+
}