diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2020-05-17 17:39:16 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-17 17:39:16 -0700 |
| commit | e0e31e67a809d00c99edaa299531c7ce4d4750dc (patch) | |
| tree | 0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/messaging/client | |
| parent | b4e02ec525a6ec87b26686202307896faf3296a7 (diff) | |
| parent | 081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff) | |
| download | seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip | |
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/messaging/client')
| -rw-r--r-- | weed/messaging/client/client.go | 30 | ||||
| -rw-r--r-- | weed/messaging/client/publisher.go | 113 | ||||
| -rw-r--r-- | weed/messaging/client/subscriber.go | 91 |
3 files changed, 0 insertions, 234 deletions
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go deleted file mode 100644 index 4a674a9fc..000000000 --- a/weed/messaging/client/client.go +++ /dev/null @@ -1,30 +0,0 @@ -package client - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type MessagingClient struct { - bootstrapBrokers []string - grpcConnection *grpc.ClientConn -} - -func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) { - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client") - - grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption) - if err != nil { - return nil, err - } - - return &MessagingClient{ - bootstrapBrokers: bootstrapBrokers, - grpcConnection: grpcConnection, - }, nil -} diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go deleted file mode 100644 index 68e5729c1..000000000 --- a/weed/messaging/client/publisher.go +++ /dev/null @@ -1,113 +0,0 @@ -package client - -import ( - "context" - - "github.com/OneOfOne/xxhash" - - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -type Publisher struct { - publishClients []messaging_pb.SeaweedMessaging_PublishClient - topicConfiguration *messaging_pb.TopicConfiguration - messageCount uint64 - publisherId string -} - -func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { - // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ - PartitionCount: 4, - } - publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupPublisherClient(namespace, topic, int32(i)) - if err != nil { - return nil, err - } - publishClients[i] = client - } - return &Publisher{ - publishClients: publishClients, - topicConfiguration: topicConfiguration, - }, nil -} - -func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) { - - stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) - if err != nil { - return nil, err - } - - // send init message - err = stream.Send(&messaging_pb.PublishRequest{ - Init: &messaging_pb.PublishRequest_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, - }, - }) - if err != nil { - return nil, err - } - - // process init response - initResponse, err := stream.Recv() - if err != nil { - return nil, err - } - if initResponse.Redirect != nil { - // TODO follow redirection - } - if initResponse.Config != nil { - } - - // setup looks for control messages - doneChan := make(chan error, 1) - go func() { - for { - in, err := stream.Recv() - if err != nil { - doneChan <- err - return - } - if in.Redirect != nil { - } - if in.Config != nil { - } - } - }() - - return stream, nil - -} - -func (p *Publisher) Publish(m *messaging_pb.Message) error { - hashValue := p.messageCount - p.messageCount++ - if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { - if m.Key != nil { - hashValue = xxhash.Checksum64(m.Key) - } - } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { - hashValue = xxhash.Checksum64(m.Key) - } else { - // round robin - } - - idx := int(hashValue) % len(p.publishClients) - if idx < 0 { - idx += len(p.publishClients) - } - return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ - Data: m, - }) -} - -func (p *Publisher) Shutdown() { - for _, client := range p.publishClients { - client.CloseSend() - } -} diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go deleted file mode 100644 index 53e7ffc7d..000000000 --- a/weed/messaging/client/subscriber.go +++ /dev/null @@ -1,91 +0,0 @@ -package client - -import ( - "context" - "io" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" -) - -type Subscriber struct { - subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient - subscriberId string -} - -func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) { - // read topic configuration - topicConfiguration := &messaging_pb.TopicConfiguration{ - PartitionCount: 4, - } - subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) - - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime) - if err != nil { - return nil, err - } - subscriberClients[i] = client - } - - return &Subscriber{ - subscriberClients: subscriberClients, - subscriberId: subscriberId, - }, nil -} - -func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { - - stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) - if err != nil { - return nil, err - } - - // send init message - err = stream.Send(&messaging_pb.SubscriberMessage{ - Init: &messaging_pb.SubscriberMessage_InitMessage{ - Namespace: namespace, - Topic: topic, - Partition: partition, - StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, - TimestampNs: startTime.UnixNano(), - SubscriberId: subscriberId, - }, - }) - if err != nil { - return nil, err - } - - // process init response - initResponse, err := stream.Recv() - if err != nil { - return nil, err - } - if initResponse.Redirect != nil { - // TODO follow redirection - } - - return stream, nil - -} - -func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { - for { - resp, listenErr := s.subscriberClients[partition].Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - println(listenErr.Error()) - return listenErr - } - processFn(resp.Data) - } -} - -// Subscribe starts goroutines to process the messages -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { - for i := 0; i < len(s.subscriberClients); i++ { - go s.doSubscribe(i, processFn) - } -} |
