diff options
Diffstat (limited to 'weed/messaging/client')
| -rw-r--r-- | weed/messaging/client/client.go | 27 | ||||
| -rw-r--r-- | weed/messaging/client/publisher.go | 72 |
2 files changed, 92 insertions, 7 deletions
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go index 9bf9bc71e..3f6d1ca53 100644 --- a/weed/messaging/client/client.go +++ b/weed/messaging/client/client.go @@ -1,11 +1,34 @@ package client +import ( + "context" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + type MessagingClient struct { bootstrapBrokers []string + grpcConnection *grpc.ClientConn } -func NewMessagingClient(bootstrapBrokers []string) *MessagingClient { +func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) { + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client") + + grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption) + if err != nil { + return nil, err + } + return &MessagingClient{ bootstrapBrokers: bootstrapBrokers, - } + grpcConnection: grpcConnection, + }, nil +} + +func (mc *MessagingClient) Shutdown() { + mc.grpcConnection.Close() } diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go index 3e21cc557..d4c0f798a 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/client/publisher.go @@ -1,14 +1,76 @@ package client -import "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) type Publisher struct { + publishClient messaging_pb.SeaweedMessaging_PublishClient +} + +func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) { + + stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) + if err != nil { + return nil, err + } + + // send init message + err = stream.Send(&messaging_pb.PublishRequest{ + Init: &messaging_pb.PublishRequest_InitMessage{ + Namespace: namespace, + Topic: topic, + Partition: 0, + }, + }) + if err != nil { + return nil, err + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return nil, err + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + if initResponse.Config != nil { + } + + // setup looks for control messages + doneChan := make(chan error, 1) + go func() { + for { + in, err := stream.Recv() + if err != nil { + doneChan <- err + return + } + if in.Redirect != nil{ + } + if in.Config != nil{ + } + } + }() + + return &Publisher{ + publishClient: stream, + }, nil } -func (c *MessagingClient) NewPublisher(namespace, topic string) *Publisher { - return &Publisher{} +func (p *Publisher) Publish(m *messaging_pb.RawData) error { + + return p.publishClient.Send(&messaging_pb.PublishRequest{ + Data: m, + }) + } -func (p *Publisher) Publish(m *messaging_pb.RawData) error{ - return nil +func (p *Publisher) Shutdown() { + + p.publishClient.CloseSend() + } |
