diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-17 11:10:45 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-17 11:10:45 -0700 |
| commit | 95ca9dd8a2de6af3eb030880123dded9ed6de602 (patch) | |
| tree | bdbec2e5158e1c5f66a7b05356e4fb69ecbdc819 /weed/messaging/msgclient/subscriber.go | |
| parent | f11233cd494b3092753b302166badbefe6bf401a (diff) | |
| download | seaweedfs-95ca9dd8a2de6af3eb030880123dded9ed6de602.tar.xz seaweedfs-95ca9dd8a2de6af3eb030880123dded9ed6de602.zip | |
subscribe support cancel
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index f96bba2ec..926e193dd 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -12,6 +12,7 @@ import ( type Subscriber struct { subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient + subscriberCancels []context.CancelFunc subscriberId string } @@ -21,6 +22,7 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, PartitionCount: 4, } subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount) + subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount) for i := 0; i < int(topicConfiguration.PartitionCount); i++ { if partitionId>=0 && i != partitionId { @@ -35,21 +37,24 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, if err != nil { return nil, err } - client, err := setupSubscriberClient(grpcClientConn, tp, subscriberId, startTime) + ctx, cancel := context.WithCancel(context.Background()) + client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime) if err != nil { return nil, err } subscriberClients[i] = client + subscriberCancels[i] = cancel } return &Subscriber{ subscriberClients: subscriberClients, + subscriberCancels: subscriberCancels, subscriberId: subscriberId, }, nil } -func setupSubscriberClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { - stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(context.Background()) +func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) { + stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx) if err != nil { return } @@ -98,3 +103,11 @@ func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { } } } + +func (s *Subscriber) Shutdown() { + for i := 0; i < len(s.subscriberClients); i++ { + if s.subscriberCancels[i] != nil { + s.subscriberCancels[i]() + } + } +} |
