diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-17 17:33:53 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-17 17:33:53 -0700 |
| commit | 3a57aef7a910ba66d6026f73d1b3055c2d8a1e4d (patch) | |
| tree | 88e05e31b10ed917df8cc8683bc5ef3f807315e8 /weed/messaging/msgclient/subscriber.go | |
| parent | 95ca9dd8a2de6af3eb030880123dded9ed6de602 (diff) | |
| download | seaweedfs-3a57aef7a910ba66d6026f73d1b3055c2d8a1e4d.tar.xz seaweedfs-3a57aef7a910ba66d6026f73d1b3055c2d8a1e4d.zip | |
sync subscribe()
Diffstat (limited to 'weed/messaging/msgclient/subscriber.go')
| -rw-r--r-- | weed/messaging/msgclient/subscriber.go | 9 |
1 files changed, 8 insertions, 1 deletions
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go index 926e193dd..01e63df40 100644 --- a/weed/messaging/msgclient/subscriber.go +++ b/weed/messaging/msgclient/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "io" "time" + "sync" "github.com/chrislusf/seaweedfs/weed/messaging/broker" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" @@ -97,11 +98,17 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, // Subscribe starts goroutines to process the messages func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) { + var wg sync.WaitGroup for i := 0; i < len(s.subscriberClients); i++ { if s.subscriberClients[i] != nil { - go doSubscribe(s.subscriberClients[i], processFn) + wg.Add(1) + go func() { + defer wg.Done() + doSubscribe(s.subscriberClients[i], processFn) + }() } } + wg.Wait() } func (s *Subscriber) Shutdown() { |
