aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/msgclient')
-rw-r--r--weed/messaging/msgclient/client.go56
-rw-r--r--weed/messaging/msgclient/pub_sub_chan.go96
-rw-r--r--weed/messaging/msgclient/publisher.go115
-rw-r--r--weed/messaging/msgclient/subscriber.go102
4 files changed, 369 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
new file mode 100644
index 000000000..f4e11232e
--- /dev/null
+++ b/weed/messaging/msgclient/client.go
@@ -0,0 +1,56 @@
+package msgclient
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessagingClient struct {
+ bootstrapBrokers []string
+ grpcConnections map[broker.TopicPartition]*grpc.ClientConn
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
+ return &MessagingClient{
+ bootstrapBrokers: bootstrapBrokers,
+ grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
+ }
+}
+
+
+func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
+
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
+ &messaging_pb.FindBrokerRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Parition: tp.Partition,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ targetBroker := resp.Broker
+ return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
+ }
+ return nil, fmt.Errorf("no broker found for %+v", tp)
+}
diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_sub_chan.go
new file mode 100644
index 000000000..d39e4c658
--- /dev/null
+++ b/weed/messaging/msgclient/pub_sub_chan.go
@@ -0,0 +1,96 @@
+package msgclient
+
+import (
+ "io"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type PubChannel struct {
+ client messaging_pb.SeaweedMessaging_PublishClient
+}
+
+func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ pc, err := setupPublisherClient(grpcConnection, tp)
+ if err != nil {
+ return nil, err
+ }
+ return &PubChannel{
+ client: pc,
+ }, nil
+}
+
+func (pc *PubChannel) Publish(m []byte) error {
+ return pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ Value: m,
+ },
+ })
+}
+func (pc *PubChannel) Close() error {
+ return pc.client.CloseSend()
+}
+
+type SubChannel struct {
+ ch chan []byte
+ stream messaging_pb.SeaweedMessaging_SubscribeClient
+}
+
+func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0,0))
+ if err != nil {
+ return nil, err
+ }
+
+ t := &SubChannel{
+ ch: make(chan []byte),
+ stream: sc,
+ }
+
+ go func() {
+ for {
+ resp, subErr := t.stream.Recv()
+ if subErr == io.EOF {
+ return
+ }
+ if subErr != nil {
+ log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
+ return
+ }
+ if resp.IsClose {
+ close(t.ch)
+ return
+ }
+ if resp.Data != nil {
+ t.ch <- resp.Data.Value
+ }
+ }
+ }()
+
+ return t, nil
+}
+
+func (sc *SubChannel) Channel() chan []byte {
+ return sc.ch
+}
diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go
new file mode 100644
index 000000000..b0459494b
--- /dev/null
+++ b/weed/messaging/msgclient/publisher.go
@@ -0,0 +1,115 @@
+package msgclient
+
+import (
+ "context"
+
+ "github.com/OneOfOne/xxhash"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type Publisher struct {
+ publishClients []messaging_pb.SeaweedMessaging_PublishClient
+ topicConfiguration *messaging_pb.TopicConfiguration
+ messageCount uint64
+ publisherId string
+}
+
+func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ client, err := mc.setupPublisherClient(namespace, topic, int32(i))
+ if err != nil {
+ return nil, err
+ }
+ publishClients[i] = client
+ }
+ return &Publisher{
+ publishClients: publishClients,
+ topicConfiguration: topicConfiguration,
+ }, nil
+}
+
+func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
+
+ stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.PublishRequest{
+ Init: &messaging_pb.PublishRequest_InitMessage{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ if initResponse.Config != nil {
+ }
+
+ // setup looks for control messages
+ doneChan := make(chan error, 1)
+ go func() {
+ for {
+ in, err := stream.Recv()
+ if err != nil {
+ doneChan <- err
+ return
+ }
+ if in.Redirect != nil {
+ }
+ if in.Config != nil {
+ }
+ }
+ }()
+
+ return stream, nil
+
+}
+
+func (p *Publisher) Publish(m *messaging_pb.Message) error {
+ hashValue := p.messageCount
+ p.messageCount++
+ if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
+ if m.Key != nil {
+ hashValue = xxhash.Checksum64(m.Key)
+ }
+ } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
+ hashValue = xxhash.Checksum64(m.Key)
+ } else {
+ // round robin
+ }
+
+ idx := int(hashValue) % len(p.publishClients)
+ if idx < 0 {
+ idx += len(p.publishClients)
+ }
+ return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
+ Data: m,
+ })
+}
+
+func (p *Publisher) Shutdown() {
+ for _, client := range p.publishClients {
+ client.CloseSend()
+ }
+}
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
new file mode 100644
index 000000000..27fa35a5b
--- /dev/null
+++ b/weed/messaging/msgclient/subscriber.go
@@ -0,0 +1,102 @@
+package msgclient
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type Subscriber struct {
+ subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberId string
+}
+
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
+ if err != nil {
+ return nil, err
+ }
+ subscriberClients[i] = client
+ }
+
+ return &Subscriber{
+ subscriberClients: subscriberClients,
+ subscriberId: subscriberId,
+ }, nil
+}
+
+func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
+
+ stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime)
+ if err != nil {
+ return client, err
+ }
+ if newBroker != nil {
+
+ }
+
+ return stream, nil
+
+}
+
+func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background())
+ if err != nil {
+ return
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.SubscriberMessage{
+ Init: &messaging_pb.SubscriberMessage_InitMessage{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: partition,
+ StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ TimestampNs: startTime.UnixNano(),
+ SubscriberId: subscriberId,
+ },
+ })
+ if err != nil {
+ return
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ return stream, nil
+}
+
+func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
+ for {
+ resp, listenErr := s.subscriberClients[partition].Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ println(listenErr.Error())
+ return listenErr
+ }
+ processFn(resp.Data)
+ }
+}
+
+// Subscribe starts goroutines to process the messages
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+ for i := 0; i < len(s.subscriberClients); i++ {
+ go s.doSubscribe(i, processFn)
+ }
+}