diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-18 15:17:27 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-18 15:17:27 -0700 |
| commit | 076c8bd3bcb6f76c84a8df50aff923d493a6bb9d (patch) | |
| tree | c04613e01216887c2efef3452d217e621832a04b /weed/messaging | |
| parent | 5d346d44bdec30d632840fb30c520cc2a334e004 (diff) | |
| download | seaweedfs-076c8bd3bcb6f76c84a8df50aff923d493a6bb9d.tar.xz seaweedfs-076c8bd3bcb6f76c84a8df50aff923d493a6bb9d.zip | |
filer master start up with default ip address instead of just localhost
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 15 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 85 | ||||
| -rw-r--r-- | weed/messaging/client/client.go | 27 | ||||
| -rw-r--r-- | weed/messaging/client/publisher.go | 72 |
4 files changed, 149 insertions, 50 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index 20e6eb04b..6208b1435 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -27,6 +27,19 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis topicConfig := &messaging_pb.TopicConfiguration{ } + + // send init response + initResponse := &messaging_pb.PublishResponse{ + Config: nil, + Redirect: nil, + } + err = stream.Send(initResponse) + if err != nil { + return err + } + if initResponse.Redirect != nil { + return nil + } // get lock tp := TopicPartition{ @@ -87,6 +100,8 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis Headers: in.Data.Headers, } + println("received message:", string(in.Data.Value)) + data, err := proto.Marshal(m) if err != nil { glog.Errorf("marshall error: %v\n", err) diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 0522eb4b7..158a84e6c 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -2,11 +2,11 @@ package broker import ( "context" - "fmt" "time" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -34,7 +34,9 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio topicLocks: NewTopicLocks(), } - go messageBroker.loopForEver() + messageBroker.checkPeers() + + // go messageBroker.loopForEver() return messageBroker, nil } @@ -52,58 +54,55 @@ func (broker *MessageBroker) checkPeers() { // contact a filer about masters var masters []string - for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err + found := false + for !found { + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err == nil { + found = true + break } - masters = append(masters, resp.Masters...) - return nil - }) - if err != nil { - fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) - return + glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) + time.Sleep(time.Second) } } + glog.V(0).Infof("received master list: %s", masters) // contact each masters for filers var filers []string - for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ - ClientType: "filer", + found = false + for !found { + for _, master := range masters { + err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ + ClientType: "filer", + }) + if err != nil { + return err + } + + filers = append(filers, resp.GrpcAddresses...) + + return nil }) - if err != nil { - return err + if err == nil { + found = true + break } - - fmt.Printf("filers: %+v\n", resp.GrpcAddresses) - filers = append(filers, resp.GrpcAddresses...) - - return nil - }) - if err != nil { - fmt.Printf("failed to list filers: %v\n", err) - return + glog.V(0).Infof("failed to list filers: %v", err) + time.Sleep(time.Second) } } + glog.V(0).Infof("received filer list: %s", filers) - // contact each filer about brokers - for _, filer := range filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - masters = append(masters, resp.Masters...) - return nil - }) - if err != nil { - fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) - return - } - } + broker.option.Filers = filers } diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go index 9bf9bc71e..3f6d1ca53 100644 --- a/weed/messaging/client/client.go +++ b/weed/messaging/client/client.go @@ -1,11 +1,34 @@ package client +import ( + "context" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + type MessagingClient struct { bootstrapBrokers []string + grpcConnection *grpc.ClientConn } -func NewMessagingClient(bootstrapBrokers []string) *MessagingClient { +func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) { + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client") + + grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption) + if err != nil { + return nil, err + } + return &MessagingClient{ bootstrapBrokers: bootstrapBrokers, - } + grpcConnection: grpcConnection, + }, nil +} + +func (mc *MessagingClient) Shutdown() { + mc.grpcConnection.Close() } diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go index 3e21cc557..d4c0f798a 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/client/publisher.go @@ -1,14 +1,76 @@ package client -import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) type Publisher struct { + publishClient messaging_pb.SeaweedMessaging_PublishClient +} + +func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) { + + stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) + if err != nil { + return nil, err + } + + // send init message + err = stream.Send(&messaging_pb.PublishRequest{ + Init: &messaging_pb.PublishRequest_InitMessage{ + Namespace: namespace, + Topic: topic, + Partition: 0, + }, + }) + if err != nil { + return nil, err + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return nil, err + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + if initResponse.Config != nil { + } + + // setup looks for control messages + doneChan := make(chan error, 1) + go func() { + for { + in, err := stream.Recv() + if err != nil { + doneChan <- err + return + } + if in.Redirect != nil{ + } + if in.Config != nil{ + } + } + }() + + return &Publisher{ + publishClient: stream, + }, nil } -func (c *MessagingClient) NewPublisher(namespace, topic string) *Publisher { - return &Publisher{} +func (p *Publisher) Publish(m *messaging_pb.RawData) error { + + return p.publishClient.Send(&messaging_pb.PublishRequest{ + Data: m, + }) + } -func (p *Publisher) Publish(m *messaging_pb.RawData) error{ - return nil +func (p *Publisher) Shutdown() { + + p.publishClient.CloseSend() + } |
