diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-16 01:18:30 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-16 01:18:30 -0700 |
| commit | d923ba2206a8238977e740f98f061242d61ed5e6 (patch) | |
| tree | a2fcaf40d5f2398b2b8ad3c2a77a22579597cc1c /weed/notification | |
| parent | bea4f6ca14615de23ba81a84cf61ba0f2a28b7f6 (diff) | |
| download | seaweedfs-d923ba2206a8238977e740f98f061242d61ed5e6.tar.xz seaweedfs-d923ba2206a8238977e740f98f061242d61ed5e6.zip | |
renaming msgqueue to notification
Diffstat (limited to 'weed/notification')
| -rw-r--r-- | weed/notification/configuration.go | 33 | ||||
| -rw-r--r-- | weed/notification/kafka/kafka_queue.go | 79 | ||||
| -rw-r--r-- | weed/notification/log/log_queue.go | 29 | ||||
| -rw-r--r-- | weed/notification/message_queue.go | 14 |
4 files changed, 155 insertions, 0 deletions
diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go new file mode 100644 index 000000000..6ac693ad8 --- /dev/null +++ b/weed/notification/configuration.go @@ -0,0 +1,33 @@ +package notification + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" +) + +var ( + MessageQueues []MessageQueue + + Queue MessageQueue +) + +func LoadConfiguration(config *viper.Viper) { + + if config == nil { + return + } + + for _, store := range MessageQueues { + if config.GetBool(store.GetName() + ".enabled") { + viperSub := config.Sub(store.GetName()) + if err := store.Initialize(viperSub); err != nil { + glog.Fatalf("Failed to initialize store for %s: %+v", + store.GetName(), err) + } + Queue = store + glog.V(0).Infof("Configure message queue for %s", store.GetName()) + return + } + } + +} diff --git a/weed/notification/kafka/kafka_queue.go b/weed/notification/kafka/kafka_queue.go new file mode 100644 index 000000000..8eb41cf85 --- /dev/null +++ b/weed/notification/kafka/kafka_queue.go @@ -0,0 +1,79 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &KafkaQueue{}) +} + +type KafkaQueue struct { + topic string + producer sarama.AsyncProducer +} + +func (k *KafkaQueue) GetName() string { + return "kafka" +} + +func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { + glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) + glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic")) + return k.initialize( + configuration.GetStringSlice("hosts"), + configuration.GetString("topic"), + ) +} + +func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) { + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForLocal + config.Producer.Partitioner = sarama.NewHashPartitioner + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + k.producer, err = sarama.NewAsyncProducer(hosts, config) + k.topic = topic + go k.handleSuccess() + go k.handleError() + return nil +} + +func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) { + bytes, err := proto.Marshal(message) + if err != nil { + return + } + + msg := &sarama.ProducerMessage{ + Topic: k.topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(bytes), + } + + k.producer.Input() <- msg + + return nil +} + +func (k *KafkaQueue) handleSuccess() { + for { + pm := <-k.producer.Successes() + if pm != nil { + glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key) + } + } +} + +func (k *KafkaQueue) handleError() { + for { + err := <-k.producer.Errors() + if err != nil { + glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic) + } + } +} diff --git a/weed/notification/log/log_queue.go b/weed/notification/log/log_queue.go new file mode 100644 index 000000000..dcc038dfc --- /dev/null +++ b/weed/notification/log/log_queue.go @@ -0,0 +1,29 @@ +package kafka + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" +) + +func init() { + notification.MessageQueues = append(notification.MessageQueues, &LogQueue{}) +} + +type LogQueue struct { +} + +func (k *LogQueue) GetName() string { + return "log" +} + +func (k *LogQueue) Initialize(configuration util.Configuration) (err error) { + return nil +} + +func (k *LogQueue) SendMessage(key string, message proto.Message) (err error) { + + glog.V(0).Infof("%v: %+v", key, message) + return nil +} diff --git a/weed/notification/message_queue.go b/weed/notification/message_queue.go new file mode 100644 index 000000000..18c4a8830 --- /dev/null +++ b/weed/notification/message_queue.go @@ -0,0 +1,14 @@ +package notification + +import ( + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" +) + +type MessageQueue interface { + // GetName gets the name to locate the configuration in message_queue.toml file + GetName() string + // Initialize initializes the file store + Initialize(configuration util.Configuration) error + SendMessage(key string, message proto.Message) error +} |
