diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-21 00:59:55 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-21 00:59:55 -0700 |
| commit | 5c348087dc0dd9a67250f80839908b22000a5591 (patch) | |
| tree | f1263b7db25b700d6616f76ea6cccdfe30710218 /weed/messaging | |
| parent | cb3985be70c2b6eb9a0e00a04a6a02f8ebd650d5 (diff) | |
| download | seaweedfs-5c348087dc0dd9a67250f80839908b22000a5591.tar.xz seaweedfs-5c348087dc0dd9a67250f80839908b22000a5591.zip | |
messaging: able to pub sub multiple partitions
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/client/publisher.go | 60 | ||||
| -rw-r--r-- | weed/messaging/client/subscriber.go | 44 |
2 files changed, 86 insertions, 18 deletions
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go index 4854ab92e..68e5729c1 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/client/publisher.go @@ -3,14 +3,38 @@ package client import ( "context" + "github.com/OneOfOne/xxhash" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" ) type Publisher struct { - publishClient messaging_pb.SeaweedMessaging_PublishClient + publishClients []messaging_pb.SeaweedMessaging_PublishClient + topicConfiguration *messaging_pb.TopicConfiguration + messageCount uint64 + publisherId string } -func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, error) { +func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupPublisherClient(namespace, topic, int32(i)) + if err != nil { + return nil, err + } + publishClients[i] = client + } + return &Publisher{ + publishClients: publishClients, + topicConfiguration: topicConfiguration, + }, nil +} + +func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) { stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background()) if err != nil { @@ -22,7 +46,7 @@ func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, er Init: &messaging_pb.PublishRequest_InitMessage{ Namespace: namespace, Topic: topic, - Partition: 0, + Partition: partition, }, }) if err != nil { @@ -56,20 +80,34 @@ func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, er } }() - return &Publisher{ - publishClient: stream, - }, nil + return stream, nil + } func (p *Publisher) Publish(m *messaging_pb.Message) error { + hashValue := p.messageCount + p.messageCount++ + if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash { + if m.Key != nil { + hashValue = xxhash.Checksum64(m.Key) + } + } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash { + hashValue = xxhash.Checksum64(m.Key) + } else { + // round robin + } - return p.publishClient.Send(&messaging_pb.PublishRequest{ + idx := int(hashValue) % len(p.publishClients) + if idx < 0 { + idx += len(p.publishClients) + } + return p.publishClients[idx].Send(&messaging_pb.PublishRequest{ Data: m, }) - } -func (p *Publisher) Close() error { - - return p.publishClient.CloseSend() +func (p *Publisher) Shutdown() { + for _, client := range p.publishClients { + client.CloseSend() + } } diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go index 407cd4ac6..ddf1f82e6 100644 --- a/weed/messaging/client/subscriber.go +++ b/weed/messaging/client/subscriber.go @@ -9,10 +9,33 @@ import ( ) type Subscriber struct { - subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient + subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberId string } func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) (*Subscriber, error) { + // read topic configuration + topicConfiguration := &messaging_pb.TopicConfiguration{ + PartitionCount: 4, + } + subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i)) + if err != nil { + return nil, err + } + subscriberClients[i] = client + } + + return &Subscriber{ + subscriberClients: subscriberClients, + subscriberId: subscriberId, + }, nil +} + +func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_SubscribeClient, error) { + stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background()) if err != nil { return nil, err @@ -23,7 +46,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) Init: &messaging_pb.SubscriberMessage_InitMessage{ Namespace: namespace, Topic: topic, - Partition: 0, + Partition: partition, StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP, TimestampNs: time.Now().UnixNano(), SubscriberId: subscriberId, @@ -42,20 +65,27 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string) // TODO follow redirection } - return &Subscriber{ - subscriberClient: stream, - }, nil + return stream, nil + } -func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error { +func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error { for { - resp, listenErr := s.subscriberClient.Recv() + resp, listenErr := s.subscriberClients[partition].Recv() if listenErr == io.EOF { return nil } if listenErr != nil { + println(listenErr.Error()) return listenErr } processFn(resp.Data) } } + +// Subscribe starts goroutines to process the messages +func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + for i:=0;i<len(s.subscriberClients);i++{ + go s.doSubscribe(i, processFn) + } +} |
