aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/client')
-rw-r--r--weed/messaging/client/client.go27
-rw-r--r--weed/messaging/client/publisher.go72
2 files changed, 92 insertions, 7 deletions
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go
index 9bf9bc71e..3f6d1ca53 100644
--- a/weed/messaging/client/client.go
+++ b/weed/messaging/client/client.go
@@ -1,11 +1,34 @@
package client
+import (
+ "context"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
type MessagingClient struct {
bootstrapBrokers []string
+ grpcConnection *grpc.ClientConn
}
-func NewMessagingClient(bootstrapBrokers []string) *MessagingClient {
+func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
+
+ grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
+ if err != nil {
+ return nil, err
+ }
+
return &MessagingClient{
bootstrapBrokers: bootstrapBrokers,
- }
+ grpcConnection: grpcConnection,
+ }, nil
+}
+
+func (mc *MessagingClient) Shutdown() {
+ mc.grpcConnection.Close()
}
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go
index 3e21cc557..d4c0f798a 100644
--- a/weed/messaging/client/publisher.go
+++ b/weed/messaging/client/publisher.go
@@ -1,14 +1,76 @@
package client
-import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
type Publisher struct {
+ publishClient messaging_pb.SeaweedMessaging_PublishClient
+}
+
+func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) {
+
+ stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.PublishRequest{
+ Init: &messaging_pb.PublishRequest_InitMessage{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: 0,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ if initResponse.Config != nil {
+ }
+
+ // setup looks for control messages
+ doneChan := make(chan error, 1)
+ go func() {
+ for {
+ in, err := stream.Recv()
+ if err != nil {
+ doneChan <- err
+ return
+ }
+ if in.Redirect != nil{
+ }
+ if in.Config != nil{
+ }
+ }
+ }()
+
+ return &Publisher{
+ publishClient: stream,
+ }, nil
}
-func (c *MessagingClient) NewPublisher(namespace, topic string) *Publisher {
- return &Publisher{}
+func (p *Publisher) Publish(m *messaging_pb.RawData) error {
+
+ return p.publishClient.Send(&messaging_pb.PublishRequest{
+ Data: m,
+ })
+
}
-func (p *Publisher) Publish(m *messaging_pb.RawData) error{
- return nil
+func (p *Publisher) Shutdown() {
+
+ p.publishClient.CloseSend()
+
}