diff options
Diffstat (limited to 'weed/messaging/msgclient')
| -rw-r--r-- | weed/messaging/msgclient/client.go | 56 | ||||
| -rw-r--r-- | weed/messaging/msgclient/pub_sub_chan.go | 96 | ||||
| -rw-r--r-- | weed/messaging/msgclient/publisher.go | 115 | ||||
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 102 |
4 files changed, 369 insertions, 0 deletions
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go new file mode 100644 index 000000000..f4e11232e --- /dev/null +++ b/weed/messaging/msgclient/client.go @@ -0,0 +1,56 @@ +package msgclient + +import ( + "context" + "fmt" + "log" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MessagingClient struct { + bootstrapBrokers []string + grpcConnections map[broker.TopicPartition]*grpc.ClientConn + grpcDialOption grpc.DialOption +} + +func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient { + return &MessagingClient{ + bootstrapBrokers: bootstrapBrokers, + grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"), + } +} + + +func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) { + + for _, broker := range mc.bootstrapBrokers { + grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption) + if err != nil { + log.Printf("dial broker %s: %v", broker, err) + continue + } + defer grpcConnection.Close() + + resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(), + &messaging_pb.FindBrokerRequest{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Parition: tp.Partition, + }) + if err != nil { + return nil, err + } + + targetBroker := resp.Broker + return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption) + } + return nil, fmt.Errorf("no broker found for %+v", tp) +} diff --git a/weed/messaging/msgclient/pub_sub_chan.go b/weed/messaging/msgclient/pub_sub_chan.go new file mode 100644 index 000000000..d39e4c658 --- /dev/null +++ b/weed/messaging/msgclient/pub_sub_chan.go @@ -0,0 +1,96 @@ +package msgclient + +import ( + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type PubChannel struct { + client messaging_pb.SeaweedMessaging_PublishClient +} + +func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + pc, err := setupPublisherClient(grpcConnection, tp) + if err != nil { + return nil, err + } + return &PubChannel{ + client: pc, + }, nil +} + +func (pc *PubChannel) Publish(m []byte) error { + return pc.client.Send(&messaging_pb.PublishRequest{ + Data: &messaging_pb.Message{ + Value: m, + }, + }) +} +func (pc *PubChannel) Close() error { + return pc.client.CloseSend() +} + +type SubChannel struct { + ch chan []byte + stream messaging_pb.SeaweedMessaging_SubscribeClient +} + +func (mc *MessagingClient) NewSubChannel(chanName string) (*SubChannel, error) { + tp := broker.TopicPartition{ + Namespace: "chan", + Topic: chanName, + Partition: 0, + } + grpcConnection, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + sc, err := setupSubscriberClient(grpcConnection, "", "chan", chanName, 0, time.Unix(0,0)) + if err != nil { + return nil, err + } + + t := &SubChannel{ + ch: make(chan []byte), + stream: sc, + } + + go func() { + for { + resp, subErr := t.stream.Recv() + if subErr == io.EOF { + return + } + if subErr != nil { + log.Printf("fail to receive from netchan %s: %v", chanName, subErr) + return + } + if resp.IsClose { + close(t.ch) + return + } + if resp.Data != nil { + t.ch <- resp.Data.Value + } + } + }() + + return t, nil +} + +func (sc *SubChannel) Channel() chan []byte { + return sc.ch +} diff --git a/weed/messaging/msgclient/publisher.go b/weed/messaging/msgclient/publisher.go new file mode 100644 index 000000000..b0459494b --- /dev/null +++ b/weed/messaging/msgclient/publisher.go @@ -0,0 +1,115 @@ +package msgclient + +import ( + "context" + + "github.com/OneOfOne/xxhash" + + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type Publisher struct { + publishClients []messaging_pb.SeaweedMessaging_PublishClient + topicConfiguration *messaging_pb.TopicConfiguration + messageCount uint64 + publisherId string +} + +func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupPublisherClient(namespace, topic, int32(i)) + if err != nil { + return nil, err + } + publishClients[i] = client + } + return &Publisher{ + publishClients: publishClients, + topicConfiguration: topicConfiguration, + }, nil +} + +func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) { + + stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) + if err != nil { + return nil, err + } + + // send init message + err = stream.Send(&messaging_pb.PublishRequest{ + Init: &messaging_pb.PublishRequest_InitMessage{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, + }, + }) + if err != nil { + return nil, err + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return nil, err + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + if initResponse.Config != nil { + } + + // setup looks for control messages + doneChan := make(chan error, 1) + go func() { + for { + in, err := stream.Recv() + if err != nil { + doneChan <- err + return + } + if in.Redirect != nil { + } + if in.Config != nil { + } + } + }() + + return stream, nil + +} + +func (p *Publisher) Publish(m *messaging_pb.Message) error { + hashValue := p.messageCount + p.messageCount++ + if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { + if m.Key != nil { + hashValue = xxhash.Checksum64(m.Key) + } + } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { + hashValue = xxhash.Checksum64(m.Key) + } else { + // round robin + } + + idx := int(hashValue) % len(p.publishClients) + if idx < 0 { + idx += len(p.publishClients) + } + return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ + Data: m, + }) +} + +func (p *Publisher) Shutdown() { + for _, client := range p.publishClients { + client.CloseSend() + } +} diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go new file mode 100644 index 000000000..27fa35a5b --- /dev/null +++ b/weed/messaging/msgclient/subscriber.go @@ -0,0 +1,102 @@ +package msgclient + +import ( + "context" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" +) + +type Subscriber struct { + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberId string +} + +func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) + if err != nil { + return nil, err + } + subscriberClients[i] = client + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberId: subscriberId, + }, nil +} + +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { + + stream, newBroker, err := mc.initSubscriberClient(subscriberId, namespace, topic, partition, startTime) + if err != nil { + return client, err + } + if newBroker != nil { + + } + + return stream, nil + +} + +func setupSubscriberClient(grpcConnection *grpc.ClientConn, subscriberId string, namespace string, topic string, partition int32, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) + if err != nil { + return + } + + // send init message + err = stream.Send(&messaging_pb.SubscriberMessage{ + Init: &messaging_pb.SubscriberMessage_InitMessage{ + Namespace: namespace, + Topic: topic, + Partition: partition, + StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, + TimestampNs: startTime.UnixNano(), + SubscriberId: subscriberId, + }, + }) + if err != nil { + return + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + return stream, nil +} + +func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { + for { + resp, listenErr := s.subscriberClients[partition].Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + println(listenErr.Error()) + return listenErr + } + processFn(resp.Data) + } +} + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + for i := 0; i < len(s.subscriberClients); i++ { + go s.doSubscribe(i, processFn) + } +} |
