aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/msgclient
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/msgclient')
-rw-r--r--weed/mq/msgclient/chan_config.go5
-rw-r--r--weed/mq/msgclient/chan_pub.go76
-rw-r--r--weed/mq/msgclient/chan_sub.go85
-rw-r--r--weed/mq/msgclient/client.go55
-rw-r--r--weed/mq/msgclient/config.go63
-rw-r--r--weed/mq/msgclient/publisher.go118
-rw-r--r--weed/mq/msgclient/subscriber.go120
7 files changed, 0 insertions, 522 deletions
diff --git a/weed/mq/msgclient/chan_config.go b/weed/mq/msgclient/chan_config.go
deleted file mode 100644
index a75678815..000000000
--- a/weed/mq/msgclient/chan_config.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package msgclient
-
-func (mc *MessagingClient) DeleteChannel(chanName string) error {
- return mc.DeleteTopic("chan", chanName)
-}
diff --git a/weed/mq/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go
deleted file mode 100644
index f4ffe832a..000000000
--- a/weed/mq/msgclient/chan_pub.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package msgclient
-
-import (
- "crypto/md5"
- "hash"
- "io"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type PubChannel struct {
- client mq_pb.SeaweedMessaging_PublishClient
- grpcConnection *grpc.ClientConn
- md5hash hash.Hash
-}
-
-func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
- tp := broker.TopicPartition{
- Namespace: "chan",
- Topic: chanName,
- Partition: 0,
- }
- grpcConnection, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- pc, err := setupPublisherClient(grpcConnection, tp)
- if err != nil {
- return nil, err
- }
- return &PubChannel{
- client: pc,
- grpcConnection: grpcConnection,
- md5hash: md5.New(),
- }, nil
-}
-
-func (pc *PubChannel) Publish(m []byte) error {
- err := pc.client.Send(&mq_pb.PublishRequest{
- Data: &mq_pb.Message{
- Value: m,
- },
- })
- if err == nil {
- pc.md5hash.Write(m)
- }
- return err
-}
-func (pc *PubChannel) Close() error {
-
- // println("send closing")
- if err := pc.client.Send(&mq_pb.PublishRequest{
- Data: &mq_pb.Message{
- IsClose: true,
- },
- }); err != nil {
- log.Printf("err send close: %v", err)
- }
- // println("receive closing")
- if _, err := pc.client.Recv(); err != nil && err != io.EOF {
- log.Printf("err receive close: %v", err)
- }
- // println("close connection")
- if err := pc.grpcConnection.Close(); err != nil {
- log.Printf("err connection close: %v", err)
- }
- return nil
-}
-
-func (pc *PubChannel) Md5() []byte {
- return pc.md5hash.Sum(nil)
-}
diff --git a/weed/mq/msgclient/chan_sub.go b/weed/mq/msgclient/chan_sub.go
deleted file mode 100644
index 859b482ef..000000000
--- a/weed/mq/msgclient/chan_sub.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package msgclient
-
-import (
- "context"
- "crypto/md5"
- "hash"
- "io"
- "log"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type SubChannel struct {
- ch chan []byte
- stream mq_pb.SeaweedMessaging_SubscribeClient
- md5hash hash.Hash
- cancel context.CancelFunc
-}
-
-func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
- tp := broker.TopicPartition{
- Namespace: "chan",
- Topic: chanName,
- Partition: 0,
- }
- grpcConnection, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
- sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
- if err != nil {
- return nil, err
- }
-
- t := &SubChannel{
- ch: make(chan []byte),
- stream: sc,
- md5hash: md5.New(),
- cancel: cancel,
- }
-
- go func() {
- for {
- resp, subErr := t.stream.Recv()
- if subErr == io.EOF {
- return
- }
- if subErr != nil {
- log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
- return
- }
- if resp.Data == nil {
- // this could be heartbeat from broker
- continue
- }
- if resp.Data.IsClose {
- t.stream.Send(&mq_pb.SubscriberMessage{
- IsClose: true,
- })
- close(t.ch)
- cancel()
- return
- }
- t.ch <- resp.Data.Value
- t.md5hash.Write(resp.Data.Value)
- }
- }()
-
- return t, nil
-}
-
-func (sc *SubChannel) Channel() chan []byte {
- return sc.ch
-}
-
-func (sc *SubChannel) Md5() []byte {
- return sc.md5hash.Sum(nil)
-}
-
-func (sc *SubChannel) Cancel() {
- sc.cancel()
-}
diff --git a/weed/mq/msgclient/client.go b/weed/mq/msgclient/client.go
deleted file mode 100644
index cc64f1acb..000000000
--- a/weed/mq/msgclient/client.go
+++ /dev/null
@@ -1,55 +0,0 @@
-package msgclient
-
-import (
- "context"
- "fmt"
- "log"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnections map[broker.TopicPartition]*grpc.ClientConn
- grpcDialOption grpc.DialOption
-}
-
-func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
- }
-}
-
-func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
-
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
- &mq_pb.FindBrokerRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Parition: tp.Partition,
- })
- if err != nil {
- return nil, err
- }
-
- targetBroker := resp.Broker
- return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
- }
- return nil, fmt.Errorf("no broker found for %+v", tp)
-}
diff --git a/weed/mq/msgclient/config.go b/weed/mq/msgclient/config.go
deleted file mode 100644
index 263ee856e..000000000
--- a/weed/mq/msgclient/config.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package msgclient
-
-import (
- "context"
- "log"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
-
- return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.ConfigureTopic(context.Background(),
- &mq_pb.ConfigureTopicRequest{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Configuration: &mq_pb.TopicConfiguration{
- PartitionCount: 0,
- Collection: "",
- Replication: "",
- IsTransient: false,
- Partitoning: 0,
- },
- })
- return err
- })
-
-}
-
-func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
-
- return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.DeleteTopic(context.Background(),
- &mq_pb.DeleteTopicRequest{
- Namespace: namespace,
- Topic: topic,
- })
- return err
- })
-}
-
-func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error {
-
- var lastErr error
- for _, broker := range mc.bootstrapBrokers {
- grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
- if err != nil {
- log.Printf("dial broker %s: %v", broker, err)
- continue
- }
- defer grpcConnection.Close()
-
- err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection))
- if err == nil {
- return nil
- }
- lastErr = err
- }
-
- return lastErr
-}
diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go
deleted file mode 100644
index 823791d10..000000000
--- a/weed/mq/msgclient/publisher.go
+++ /dev/null
@@ -1,118 +0,0 @@
-package msgclient
-
-import (
- "context"
-
- "github.com/OneOfOne/xxhash"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type Publisher struct {
- publishClients []mq_pb.SeaweedMessaging_PublishClient
- topicConfiguration *mq_pb.TopicConfiguration
- messageCount uint64
- publisherId string
-}
-
-func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
- // read topic configuration
- topicConfiguration := &mq_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- client, err := setupPublisherClient(grpcClientConn, tp)
- if err != nil {
- return nil, err
- }
- publishClients[i] = client
- }
- return &Publisher{
- publishClients: publishClients,
- topicConfiguration: topicConfiguration,
- }, nil
-}
-
-func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
-
- stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&mq_pb.PublishRequest{
- Init: &mq_pb.PublishRequest_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
- if initResponse.Config != nil {
- }
-
- // setup looks for control messages
- doneChan := make(chan error, 1)
- go func() {
- for {
- in, err := stream.Recv()
- if err != nil {
- doneChan <- err
- return
- }
- if in.Redirect != nil {
- }
- if in.Config != nil {
- }
- }
- }()
-
- return stream, nil
-
-}
-
-func (p *Publisher) Publish(m *mq_pb.Message) error {
- hashValue := p.messageCount
- p.messageCount++
- if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
- if m.Key != nil {
- hashValue = xxhash.Checksum64(m.Key)
- }
- } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
- hashValue = xxhash.Checksum64(m.Key)
- } else {
- // round robin
- }
-
- idx := int(hashValue) % len(p.publishClients)
- if idx < 0 {
- idx += len(p.publishClients)
- }
- return p.publishClients[idx].Send(&mq_pb.PublishRequest{
- Data: m,
- })
-}
diff --git a/weed/mq/msgclient/subscriber.go b/weed/mq/msgclient/subscriber.go
deleted file mode 100644
index f3da40fb3..000000000
--- a/weed/mq/msgclient/subscriber.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package msgclient
-
-import (
- "context"
- "io"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
-)
-
-type Subscriber struct {
- subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient
- subscriberCancels []context.CancelFunc
- subscriberId string
-}
-
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
- // read topic configuration
- topicConfiguration := &mq_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
- subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
-
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- if partitionId >= 0 && i != partitionId {
- continue
- }
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- ctx, cancel := context.WithCancel(context.Background())
- client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
- if err != nil {
- return nil, err
- }
- subscriberClients[i] = client
- subscriberCancels[i] = cancel
- }
-
- return &Subscriber{
- subscriberClients: subscriberClients,
- subscriberCancels: subscriberCancels,
- subscriberId: subscriberId,
- }, nil
-}
-
-func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) {
- stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
- if err != nil {
- return
- }
-
- // send init message
- err = stream.Send(&mq_pb.SubscriberMessage{
- Init: &mq_pb.SubscriberMessage_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP,
- TimestampNs: startTime.UnixNano(),
- SubscriberId: subscriberId,
- },
- })
- if err != nil {
- return
- }
-
- return stream, nil
-}
-
-func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error {
- for {
- resp, listenErr := subscriberClient.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- println(listenErr.Error())
- return listenErr
- }
- if resp.Data == nil {
- // this could be heartbeat from broker
- continue
- }
- processFn(resp.Data)
- }
-}
-
-// Subscribe starts goroutines to process the messages
-func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) {
- var wg sync.WaitGroup
- for i := 0; i < len(s.subscriberClients); i++ {
- if s.subscriberClients[i] != nil {
- wg.Add(1)
- go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) {
- defer wg.Done()
- doSubscribe(subscriberClient, processFn)
- }(s.subscriberClients[i])
- }
- }
- wg.Wait()
-}
-
-func (s *Subscriber) Shutdown() {
- for i := 0; i < len(s.subscriberClients); i++ {
- if s.subscriberCancels[i] != nil {
- s.subscriberCancels[i]()
- }
- }
-}