diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-07-16 11:13:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-07-16 11:13:23 +0800 |
| commit | d19bbee98d89ec6cd603572bd9c5d55749610e61 (patch) | |
| tree | 8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/notification | |
| parent | 01060c992591f412b0d5e180bde29991747a9462 (diff) | |
| parent | 5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff) | |
| download | seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip | |
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/gocdk_pub_sub/gocdk_pub_sub.go | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go new file mode 100644 index 000000000..94a413ac0 --- /dev/null +++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go @@ -0,0 +1,71 @@ +// Package gocdk_pub_sub supports the Go CDK (Cloud Development Kit) PubSub API, +// which in turn supports many providers, including Amazon SNS/SQS, Azure Service Bus, +// Google Cloud PubSub, and RabbitMQ. +// +// In the config, select a provider and topic using a URL. See +// https://godoc.org/gocloud.dev/pubsub and its sub-packages for details. +// +// The Go CDK PubSub API does not support administrative operations like topic +// creation. Create the topic using a UI, CLI or provider-specific API before running +// weed. +// +// The Go CDK obtains credentials via environment variables and other +// provider-specific default mechanisms. See the provider's documentation for +// details. +package gocdk_pub_sub + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "gocloud.dev/pubsub" + _ "gocloud.dev/pubsub/awssnssqs" + _ "gocloud.dev/pubsub/azuresb" + _ "gocloud.dev/pubsub/gcppubsub" + _ "gocloud.dev/pubsub/natspubsub" + _ "gocloud.dev/pubsub/rabbitpubsub" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &GoCDKPubSub{}) +} + +type GoCDKPubSub struct { + topicURL string + topic *pubsub.Topic +} + +func (k *GoCDKPubSub) GetName() string { + return "gocdk_pub_sub" +} + +func (k *GoCDKPubSub) Initialize(config util.Configuration) error { + k.topicURL = config.GetString("topic_url") + glog.V(0).Infof("notification.gocdk_pub_sub.topic_url: %v", k.topicURL) + topic, err := pubsub.OpenTopic(context.Background(), k.topicURL) + if err != nil { + glog.Fatalf("Failed to open topic: %v", err) + } + k.topic = topic + return nil +} + +func (k *GoCDKPubSub) SendMessage(key string, message proto.Message) error { + bytes, err := proto.Marshal(message) + if err != nil { + return err + } + ctx := context.Background() + err = k.topic.Send(ctx, &pubsub.Message{ + Body: bytes, + Metadata: map[string]string{"key": key}, + }) + if err != nil { + return fmt.Errorf("send message via Go CDK pubsub %s: %v", k.topicURL, err) + } + return nil +} |
