diff options
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 87 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 1 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 76 | ||||
| -rw-r--r-- | weed/messaging/client/client.go | 30 | ||||
| -rw-r--r-- | weed/messaging/msgclient/client.go | 56 | ||||
| -rw-r--r-- | weed/messaging/msgclient/pub_sub_chan.go | 96 | ||||
| -rw-r--r-- | weed/messaging/msgclient/publisher.go (renamed from weed/messaging/client/publisher.go) | 14 | ||||
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go (renamed from weed/messaging/client/subscriber.go) | 25 |
8 files changed, 289 insertions, 96 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go new file mode 100644 index 000000000..4b7f357fa --- /dev/null +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -0,0 +1,87 @@ +package broker + +import ( + "context" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +/* +Topic discovery: + +When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker. + +The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it. +Otherwise, just host the topic. + +So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy. +If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help. + +*/ + +func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) { + + panic("implement me") +} + + + +func (broker *MessageBroker) checkPeers() { + + // contact a filer about masters + var masters []string + found := false + for !found { + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err == nil { + found = true + break + } + glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) + time.Sleep(time.Second) + } + } + glog.V(0).Infof("received master list: %s", masters) + + // contact each masters for filers + var filers []string + found = false + for !found { + for _, master := range masters { + err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ + ClientType: "filer", + }) + if err != nil { + return err + } + + filers = append(filers, resp.GrpcAddresses...) + + return nil + }) + if err == nil { + found = true + break + } + glog.V(0).Infof("failed to list filers: %v", err) + time.Sleep(time.Second) + } + } + glog.V(0).Infof("received filer list: %s", filers) + + broker.option.Filers = filers + +} diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 379063eed..472a5007b 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -38,7 +38,6 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } if err = stream.Send(&messaging_pb.BrokerMessage{ - Redirect: nil, }); err != nil { return err } diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 29c227274..9cad27214 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -16,6 +16,7 @@ type MessageBrokerOption struct { Filers []string DefaultReplication string MaxMB int + Ip string Port int Cipher bool } @@ -37,73 +38,44 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio messageBroker.checkPeers() - // go messageBroker.loopForEver() + go messageBroker.keepConnectedToOneFiler() return messageBroker, nil } -func (broker *MessageBroker) loopForEver() { +func (broker *MessageBroker) keepConnectedToOneFiler() { for { - broker.checkPeers() - time.Sleep(3 * time.Second) - } - -} - -func (broker *MessageBroker) checkPeers() { - - // contact a filer about masters - var masters []string - found := false - for !found { for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.KeepConnected(context.Background()) if err != nil { + glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err) return err } - masters = append(masters, resp.Masters...) - return nil - }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err) - time.Sleep(time.Second) - } - } - glog.V(0).Infof("received master list: %s", masters) - - // contact each masters for filers - var filers []string - found = false - for !found { - for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { - resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ - ClientType: "filer", - }) - if err != nil { - return err + glog.V(0).Infof("conntected with filer: %v", filer) + for { + if err := stream.Send(&filer_pb.KeepConnectedRequest{ + Name: broker.option.Ip, + GrpcPort: uint32(broker.option.Port), + }); err != nil { + glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("send heartbeat") + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err) + return err + } + // println("received reply") + time.Sleep(11*time.Second) + // println("woke up") } - - filers = append(filers, resp.GrpcAddresses...) - return nil }) - if err == nil { - found = true - break - } - glog.V(0).Infof("failed to list filers: %v", err) - time.Sleep(time.Second) + time.Sleep(3*time.Second) } } - glog.V(0).Infof("received filer list: %s", filers) - - broker.option.Filers = filers } diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go deleted file mode 100644 index 4a674a9fc..000000000 --- a/weed/messaging/client/client.go +++ /dev/null @@ -1,30 +0,0 @@ -package client - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnection *grpc.ClientConn -} - -func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) { - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client") - - grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption) - if err != nil { - return nil, err - } - - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnection: grpcConnection, - }, nil -} diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go new file mode 100644 index 000000000..f4e11232e --- /dev/null +++ b/weed/messaging/msgclient/client.go @@ -0,0 +1,56 @@ +package msgclient + +import ( + "context" + "fmt" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MessagingClient struct { + bootstrapBrokers []string + grpcConnections map[broker.TopicPartition]*grpc.ClientConn + grpcDialOption grpc.DialOption +} + +func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { + return &MessagingClient{ + bootstrapBrokers: bootstrapBrokers, + grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), + } +} + + +func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { + + for _, broker := range mc.bootstrapBrokers { + grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) + if err != nil { + log.Printf("dial broker %s: %v", broker, err) + continue + } + defer grpcConnection.Close() + + resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), + &messaging_pb.FindBrokerRequest{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Parition: tp.Partition, + }) + if err != nil { + return nil, err + } + + targetBroker := resp.Broker + return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) + } + return nil, fmt.Errorf("no broker found for %+v", tp) +} diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_sub_chan.go new file mode 100644 index 000000000..d39e4c658 --- /dev/null +++ b/weed/messaging/msgclient/pub_sub_chan.go @@ -0,0 +1,96 @@ +package msgclient + +import ( + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type PubChannel struct { + client messaging_pb.SeaweedMessaging_PublishClient +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + return pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + Value: m, + }, + }) +} +func (pc *PubChannel) Close() error { + return pc.client.CloseSend() +} + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient +} + +func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0,0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.IsClose { + close(t.ch) + return + } + if resp.Data != nil { + t.ch <- resp.Data.Value + } + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +} diff --git a/weed/messaging/client/publisher.go b/weed/messaging/msgclient/publisher.go index 68e5729c1..b0459494b 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/msgclient/publisher.go @@ -1,10 +1,12 @@ -package client +package msgclient import ( "context" "github.com/OneOfOne/xxhash" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) @@ -34,9 +36,9 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (* }, nil } -func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) { +func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { - stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) + stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) if err != nil { return nil, err } @@ -44,9 +46,9 @@ func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partiti // send init message err = stream.Send(&messaging_pb.PublishRequest{ Init: &messaging_pb.PublishRequest_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, }, }) if err != nil { diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/msgclient/subscriber.go index 53e7ffc7d..27fa35a5b 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -1,4 +1,4 @@ -package client +package msgclient import ( "context" @@ -36,9 +36,22 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { - stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) + stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime) if err != nil { - return nil, err + return client, err + } + if newBroker != nil { + + } + + return stream, nil + +} + +func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) + if err != nil { + return } // send init message @@ -53,20 +66,18 @@ func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic }, }) if err != nil { - return nil, err + return } // process init response initResponse, err := stream.Recv() if err != nil { - return nil, err + return } if initResponse.Redirect != nil { // TODO follow redirection } - return stream, nil - } func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { |
