diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-09-05 19:57:44 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-09-05 07:57:44 -0700 |
| commit | 9b2b7d4f5a187cb7d9f7c4833a5333847db35e8a (patch) | |
| tree | 11af2832875fb47ba1a8cabae4a16bbc3e3d0930 | |
| parent | 916673ae24861fc7ad169dd3f580962d1a65b39a (diff) | |
| download | seaweedfs-9b2b7d4f5a187cb7d9f7c4833a5333847db35e8a.tar.xz seaweedfs-9b2b7d4f5a187cb7d9f7c4833a5333847db35e8a.zip | |
avoid data race on GoCDKPubSub.topic (#3596)
| -rw-r--r-- | weed/notification/gocdk_pub_sub/gocdk_pub_sub.go | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go index fcd8e61aa..131345f9c 100644 --- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/proto" "net/url" "path" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -49,33 +50,44 @@ func getPath(rawUrl string) string { } type GoCDKPubSub struct { - topicURL string - topic *pubsub.Topic + topicURL string + topic *pubsub.Topic + topicLock sync.RWMutex } func (k *GoCDKPubSub) GetName() string { return "gocdk_pub_sub" } +func (k *GoCDKPubSub) setTopic(topic *pubsub.Topic) { + k.topicLock.Lock() + k.topic = topic + k.topicLock.Unlock() + k.doReconnect() +} + func (k *GoCDKPubSub) doReconnect() { var conn *amqp.Connection + k.topicLock.RLock() + defer k.topicLock.RUnlock() if k.topic.As(&conn) { - go func() { - <-conn.NotifyClose(make(chan *amqp.Error)) - conn.Close() + go func(c *amqp.Connection) { + <-c.NotifyClose(make(chan *amqp.Error)) + c.Close() + k.topicLock.RLock() k.topic.Shutdown(context.Background()) + k.topicLock.RUnlock() for { glog.Info("Try reconnect") conn, err := amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) if err == nil { - k.topic = rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil) - k.doReconnect() + k.setTopic(rabbitpubsub.OpenTopic(conn, getPath(k.topicURL), nil)) break } glog.Error(err) time.Sleep(time.Second) } - }() + }(conn) } } @@ -86,8 +98,7 @@ func (k *GoCDKPubSub) Initialize(configuration util.Configuration, prefix string if err != nil { glog.Fatalf("Failed to open topic: %v", err) } - k.topic = topic - k.doReconnect() + k.setTopic(topic) return nil } @@ -96,6 +107,8 @@ func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error { if err != nil { return err } + k.topicLock.RLock() + defer k.topicLock.RUnlock() err = k.topic.Send(context.Background(), &pubsub.Message{ Body: bytes, Metadata: map[string]string{"key": key}, |
