aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/client')
-rw-r--r--weed/messaging/client/client.go30
-rw-r--r--weed/messaging/client/publisher.go113
-rw-r--r--weed/messaging/client/subscriber.go91
3 files changed, 0 insertions, 234 deletions
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go
deleted file mode 100644
index 4a674a9fc..000000000
--- a/weed/messaging/client/client.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package client
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnection *grpc.ClientConn
-}
-
-func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
-
- grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
- if err != nil {
- return nil, err
- }
-
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnection: grpcConnection,
- }, nil
-}
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go
deleted file mode 100644
index 68e5729c1..000000000
--- a/weed/messaging/client/publisher.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package client
-
-import (
- "context"
-
- "github.com/OneOfOne/xxhash"
-
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-type Publisher struct {
- publishClients []messaging_pb.SeaweedMessaging_PublishClient
- topicConfiguration *messaging_pb.TopicConfiguration
- messageCount uint64
- publisherId string
-}
-
-func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
- // read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- client, err := mc.setupPublisherClient(namespace, topic, int32(i))
- if err != nil {
- return nil, err
- }
- publishClients[i] = client
- }
- return &Publisher{
- publishClients: publishClients,
- topicConfiguration: topicConfiguration,
- }, nil
-}
-
-func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) {
-
- stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&messaging_pb.PublishRequest{
- Init: &messaging_pb.PublishRequest_InitMessage{
- Namespace: namespace,
- Topic: topic,
- Partition: partition,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
- if initResponse.Config != nil {
- }
-
- // setup looks for control messages
- doneChan := make(chan error, 1)
- go func() {
- for {
- in, err := stream.Recv()
- if err != nil {
- doneChan <- err
- return
- }
- if in.Redirect != nil {
- }
- if in.Config != nil {
- }
- }
- }()
-
- return stream, nil
-
-}
-
-func (p *Publisher) Publish(m *messaging_pb.Message) error {
- hashValue := p.messageCount
- p.messageCount++
- if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
- if m.Key != nil {
- hashValue = xxhash.Checksum64(m.Key)
- }
- } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
- hashValue = xxhash.Checksum64(m.Key)
- } else {
- // round robin
- }
-
- idx := int(hashValue) % len(p.publishClients)
- if idx < 0 {
- idx += len(p.publishClients)
- }
- return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
- Data: m,
- })
-}
-
-func (p *Publisher) Shutdown() {
- for _, client := range p.publishClients {
- client.CloseSend()
- }
-}
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go
deleted file mode 100644
index 53e7ffc7d..000000000
--- a/weed/messaging/client/subscriber.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package client
-
-import (
- "context"
- "io"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-type Subscriber struct {
- subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
- subscriberId string
-}
-
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
- // read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
-
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
- if err != nil {
- return nil, err
- }
- subscriberClients[i] = client
- }
-
- return &Subscriber{
- subscriberClients: subscriberClients,
- subscriberId: subscriberId,
- }, nil
-}
-
-func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
-
- stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&messaging_pb.SubscriberMessage{
- Init: &messaging_pb.SubscriberMessage_InitMessage{
- Namespace: namespace,
- Topic: topic,
- Partition: partition,
- StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
- TimestampNs: startTime.UnixNano(),
- SubscriberId: subscriberId,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
-
- return stream, nil
-
-}
-
-func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
- for {
- resp, listenErr := s.subscriberClients[partition].Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- println(listenErr.Error())
- return listenErr
- }
- processFn(resp.Data)
- }
-}
-
-// Subscribe starts goroutines to process the messages
-func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
- for i := 0; i < len(s.subscriberClients); i++ {
- go s.doSubscribe(i, processFn)
- }
-}