aboutsummaryrefslogtreecommitdiff
path: root/weed/msgqueue/kafka/kafka_queue.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-08-13 01:20:49 -0700
committerChris Lu <chris.lu@gmail.com>2018-08-13 01:20:49 -0700
commitf036ef8a3c50af3c933dcd96026ca70dc5fd0da3 (patch)
treec4bc38f75396b44476d8cdaad28b6180af3c6291 /weed/msgqueue/kafka/kafka_queue.go
parent75d63db60d1677f2e3350c3ee2b9dbecf931ec1a (diff)
downloadseaweedfs-f036ef8a3c50af3c933dcd96026ca70dc5fd0da3.tar.xz
seaweedfs-f036ef8a3c50af3c933dcd96026ca70dc5fd0da3.zip
add filer notification
Diffstat (limited to 'weed/msgqueue/kafka/kafka_queue.go')
-rw-r--r--weed/msgqueue/kafka/kafka_queue.go76
1 files changed, 76 insertions, 0 deletions
diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go
new file mode 100644
index 000000000..c013baf5f
--- /dev/null
+++ b/weed/msgqueue/kafka/kafka_queue.go
@@ -0,0 +1,76 @@
+package kafka
+
+import (
+ _ "github.com/go-sql-driver/mysql"
+ "github.com/chrislusf/seaweedfs/weed/msgqueue"
+ "github.com/golang/protobuf/proto"
+ "github.com/Shopify/sarama"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func init() {
+ msgqueue.MessageQueues = append(msgqueue.MessageQueues, &KafkaQueue{})
+}
+
+type KafkaQueue struct {
+ topic string
+ producer sarama.AsyncProducer
+}
+
+func (k *KafkaQueue) GetName() string {
+ return "kafka"
+}
+
+func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+ return k.initialize(
+ configuration.GetStringSlice("hosts"),
+ configuration.GetString("topic"),
+ )
+}
+
+func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
+ config := sarama.NewConfig()
+ config.Producer.RequiredAcks = sarama.WaitForLocal
+ config.Producer.Partitioner = sarama.NewHashPartitioner
+ config.Producer.Return.Successes = true
+ config.Producer.Return.Errors = true
+ k.producer, err = sarama.NewAsyncProducer(hosts, config)
+ go k.handleSuccess()
+ go k.handleError()
+ return nil
+}
+
+func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
+ bytes, err := proto.Marshal(message)
+ if err != nil {
+ return
+ }
+
+ msg := &sarama.ProducerMessage{
+ Topic: k.topic,
+ Key: sarama.StringEncoder(key),
+ Value: sarama.ByteEncoder(bytes),
+ }
+
+ k.producer.Input() <- msg
+
+ return nil
+}
+
+func (k *KafkaQueue) handleSuccess() {
+ for {
+ pm := <-k.producer.Successes()
+ if pm != nil {
+ glog.Infof("producer message success, partition:%d offset:%d key:%v valus:%s", pm.Partition, pm.Offset, pm.Key, pm.Value)
+ }
+ }
+}
+
+func (k *KafkaQueue) handleError() {
+ for {
+ err := <-k.producer.Errors()
+ if err != nil {
+ glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v)", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err)
+ }
+ }
+}