diff options
Diffstat (limited to 'weed/mq/msgclient')
| -rw-r--r-- | weed/mq/msgclient/chan_config.go | 5 | ||||
| -rw-r--r-- | weed/mq/msgclient/chan_pub.go | 76 | ||||
| -rw-r--r-- | weed/mq/msgclient/chan_sub.go | 85 | ||||
| -rw-r--r-- | weed/mq/msgclient/client.go | 55 | ||||
| -rw-r--r-- | weed/mq/msgclient/config.go | 63 | ||||
| -rw-r--r-- | weed/mq/msgclient/publisher.go | 118 | ||||
| -rw-r--r-- | weed/mq/msgclient/subscriber.go | 120 |
7 files changed, 0 insertions, 522 deletions
diff --git a/weed/mq/msgclient/chan_config.go b/weed/mq/msgclient/chan_config.go deleted file mode 100644 index a75678815..000000000 --- a/weed/mq/msgclient/chan_config.go +++ /dev/null @@ -1,5 +0,0 @@ -package msgclient - -func (mc *MessagingClient) DeleteChannel(chanName string) error { - return mc.DeleteTopic("chan", chanName) -} diff --git a/weed/mq/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go deleted file mode 100644 index f4ffe832a..000000000 --- a/weed/mq/msgclient/chan_pub.go +++ /dev/null @@ -1,76 +0,0 @@ -package msgclient - -import ( - "crypto/md5" - "hash" - "io" - "log" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" -) - -type PubChannel struct { - client mq_pb.SeaweedMessaging_PublishClient - grpcConnection *grpc.ClientConn - md5hash hash.Hash -} - -func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { - tp := broker.TopicPartition{ - Namespace: "chan", - Topic: chanName, - Partition: 0, - } - grpcConnection, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - pc, err := setupPublisherClient(grpcConnection, tp) - if err != nil { - return nil, err - } - return &PubChannel{ - client: pc, - grpcConnection: grpcConnection, - md5hash: md5.New(), - }, nil -} - -func (pc *PubChannel) Publish(m []byte) error { - err := pc.client.Send(&mq_pb.PublishRequest{ - Data: &mq_pb.Message{ - Value: m, - }, - }) - if err == nil { - pc.md5hash.Write(m) - } - return err -} -func (pc *PubChannel) Close() error { - - // println("send closing") - if err := pc.client.Send(&mq_pb.PublishRequest{ - Data: &mq_pb.Message{ - IsClose: true, - }, - }); err != nil { - log.Printf("err send close: %v", err) - } - // println("receive closing") - if _, err := pc.client.Recv(); err != nil && err != io.EOF { - log.Printf("err receive close: %v", err) - } - // println("close connection") - if err := pc.grpcConnection.Close(); err != nil { - log.Printf("err connection close: %v", err) - } - return nil -} - -func (pc *PubChannel) Md5() []byte { - return pc.md5hash.Sum(nil) -} diff --git a/weed/mq/msgclient/chan_sub.go b/weed/mq/msgclient/chan_sub.go deleted file mode 100644 index 859b482ef..000000000 --- a/weed/mq/msgclient/chan_sub.go +++ /dev/null @@ -1,85 +0,0 @@ -package msgclient - -import ( - "context" - "crypto/md5" - "hash" - "io" - "log" - "time" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" -) - -type SubChannel struct { - ch chan []byte - stream mq_pb.SeaweedMessaging_SubscribeClient - md5hash hash.Hash - cancel context.CancelFunc -} - -func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { - tp := broker.TopicPartition{ - Namespace: "chan", - Topic: chanName, - Partition: 0, - } - grpcConnection, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) - sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) - if err != nil { - return nil, err - } - - t := &SubChannel{ - ch: make(chan []byte), - stream: sc, - md5hash: md5.New(), - cancel: cancel, - } - - go func() { - for { - resp, subErr := t.stream.Recv() - if subErr == io.EOF { - return - } - if subErr != nil { - log.Printf("fail to receive from netchan %s: %v", chanName, subErr) - return - } - if resp.Data == nil { - // this could be heartbeat from broker - continue - } - if resp.Data.IsClose { - t.stream.Send(&mq_pb.SubscriberMessage{ - IsClose: true, - }) - close(t.ch) - cancel() - return - } - t.ch <- resp.Data.Value - t.md5hash.Write(resp.Data.Value) - } - }() - - return t, nil -} - -func (sc *SubChannel) Channel() chan []byte { - return sc.ch -} - -func (sc *SubChannel) Md5() []byte { - return sc.md5hash.Sum(nil) -} - -func (sc *SubChannel) Cancel() { - sc.cancel() -} diff --git a/weed/mq/msgclient/client.go b/weed/mq/msgclient/client.go deleted file mode 100644 index cc64f1acb..000000000 --- a/weed/mq/msgclient/client.go +++ /dev/null @@ -1,55 +0,0 @@ -package msgclient - -import ( - "context" - "fmt" - "log" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnections map[broker.TopicPartition]*grpc.ClientConn - grpcDialOption grpc.DialOption -} - -func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), - } -} - -func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { - - for _, broker := range mc.bootstrapBrokers { - grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) - if err != nil { - log.Printf("dial broker %s: %v", broker, err) - continue - } - defer grpcConnection.Close() - - resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), - &mq_pb.FindBrokerRequest{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Parition: tp.Partition, - }) - if err != nil { - return nil, err - } - - targetBroker := resp.Broker - return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) - } - return nil, fmt.Errorf("no broker found for %+v", tp) -} diff --git a/weed/mq/msgclient/config.go b/weed/mq/msgclient/config.go deleted file mode 100644 index 263ee856e..000000000 --- a/weed/mq/msgclient/config.go +++ /dev/null @@ -1,63 +0,0 @@ -package msgclient - -import ( - "context" - "log" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" -) - -func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error { - - return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.ConfigureTopic(context.Background(), - &mq_pb.ConfigureTopicRequest{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Configuration: &mq_pb.TopicConfiguration{ - PartitionCount: 0, - Collection: "", - Replication: "", - IsTransient: false, - Partitoning: 0, - }, - }) - return err - }) - -} - -func (mc *MessagingClient) DeleteTopic(namespace, topic string) error { - - return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.DeleteTopic(context.Background(), - &mq_pb.DeleteTopicRequest{ - Namespace: namespace, - Topic: topic, - }) - return err - }) -} - -func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error { - - var lastErr error - for _, broker := range mc.bootstrapBrokers { - grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) - if err != nil { - log.Printf("dial broker %s: %v", broker, err) - continue - } - defer grpcConnection.Close() - - err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection)) - if err == nil { - return nil - } - lastErr = err - } - - return lastErr -} diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go deleted file mode 100644 index 823791d10..000000000 --- a/weed/mq/msgclient/publisher.go +++ /dev/null @@ -1,118 +0,0 @@ -package msgclient - -import ( - "context" - - "github.com/OneOfOne/xxhash" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" -) - -type Publisher struct { - publishClients []mq_pb.SeaweedMessaging_PublishClient - topicConfiguration *mq_pb.TopicConfiguration - messageCount uint64 - publisherId string -} - -func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { - // read topic configuration - topicConfiguration := &mq_pb.TopicConfiguration{ - PartitionCount: 4, - } - publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - tp := broker.TopicPartition{ - Namespace: namespace, - Topic: topic, - Partition: int32(i), - } - grpcClientConn, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - client, err := setupPublisherClient(grpcClientConn, tp) - if err != nil { - return nil, err - } - publishClients[i] = client - } - return &Publisher{ - publishClients: publishClients, - topicConfiguration: topicConfiguration, - }, nil -} - -func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) { - - stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) - if err != nil { - return nil, err - } - - // send init message - err = stream.Send(&mq_pb.PublishRequest{ - Init: &mq_pb.PublishRequest_InitMessage{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Partition: tp.Partition, - }, - }) - if err != nil { - return nil, err - } - - // process init response - initResponse, err := stream.Recv() - if err != nil { - return nil, err - } - if initResponse.Redirect != nil { - // TODO follow redirection - } - if initResponse.Config != nil { - } - - // setup looks for control messages - doneChan := make(chan error, 1) - go func() { - for { - in, err := stream.Recv() - if err != nil { - doneChan <- err - return - } - if in.Redirect != nil { - } - if in.Config != nil { - } - } - }() - - return stream, nil - -} - -func (p *Publisher) Publish(m *mq_pb.Message) error { - hashValue := p.messageCount - p.messageCount++ - if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash { - if m.Key != nil { - hashValue = xxhash.Checksum64(m.Key) - } - } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash { - hashValue = xxhash.Checksum64(m.Key) - } else { - // round robin - } - - idx := int(hashValue) % len(p.publishClients) - if idx < 0 { - idx += len(p.publishClients) - } - return p.publishClients[idx].Send(&mq_pb.PublishRequest{ - Data: m, - }) -} diff --git a/weed/mq/msgclient/subscriber.go b/weed/mq/msgclient/subscriber.go deleted file mode 100644 index f3da40fb3..000000000 --- a/weed/mq/msgclient/subscriber.go +++ /dev/null @@ -1,120 +0,0 @@ -package msgclient - -import ( - "context" - "io" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" - "google.golang.org/grpc" -) - -type Subscriber struct { - subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient - subscriberCancels []context.CancelFunc - subscriberId string -} - -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) { - // read topic configuration - topicConfiguration := &mq_pb.TopicConfiguration{ - PartitionCount: 4, - } - subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) - subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) - - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - if partitionId >= 0 && i != partitionId { - continue - } - tp := broker.TopicPartition{ - Namespace: namespace, - Topic: topic, - Partition: int32(i), - } - grpcClientConn, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) - client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) - if err != nil { - return nil, err - } - subscriberClients[i] = client - subscriberCancels[i] = cancel - } - - return &Subscriber{ - subscriberClients: subscriberClients, - subscriberCancels: subscriberCancels, - subscriberId: subscriberId, - }, nil -} - -func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) { - stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) - if err != nil { - return - } - - // send init message - err = stream.Send(&mq_pb.SubscriberMessage{ - Init: &mq_pb.SubscriberMessage_InitMessage{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Partition: tp.Partition, - StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: startTime.UnixNano(), - SubscriberId: subscriberId, - }, - }) - if err != nil { - return - } - - return stream, nil -} - -func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error { - for { - resp, listenErr := subscriberClient.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - println(listenErr.Error()) - return listenErr - } - if resp.Data == nil { - // this could be heartbeat from broker - continue - } - processFn(resp.Data) - } -} - -// Subscribe starts goroutines to process the messages -func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) { - var wg sync.WaitGroup - for i := 0; i < len(s.subscriberClients); i++ { - if s.subscriberClients[i] != nil { - wg.Add(1) - go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) { - defer wg.Done() - doSubscribe(subscriberClient, processFn) - }(s.subscriberClients[i]) - } - } - wg.Wait() -} - -func (s *Subscriber) Shutdown() { - for i := 0; i < len(s.subscriberClients); i++ { - if s.subscriberCancels[i] != nil { - s.subscriberCancels[i]() - } - } -} |
