aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/kafka/kafka_queue.go
blob: 6c737c716bd33659d7e1c39ddbbc06f0c574ac21 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package kafka

import (
	"github.com/Shopify/sarama"
	"github.com/chrislusf/seaweedfs/weed/util/log"
	"github.com/chrislusf/seaweedfs/weed/notification"
	"github.com/chrislusf/seaweedfs/weed/util"
	"github.com/golang/protobuf/proto"
)

func init() {
	notification.MessageQueues = append(notification.MessageQueues, &KafkaQueue{})
}

type KafkaQueue struct {
	topic    string
	producer sarama.AsyncProducer
}

func (k *KafkaQueue) GetName() string {
	return "kafka"
}

func (k *KafkaQueue) Initialize(configuration util.Configuration, prefix string) (err error) {
	log.Infof("filer.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
	log.Infof("filer.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
	return k.initialize(
		configuration.GetStringSlice(prefix+"hosts"),
		configuration.GetString(prefix+"topic"),
	)
}

func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.Partitioner = sarama.NewHashPartitioner
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	k.producer, err = sarama.NewAsyncProducer(hosts, config)
	if err != nil {
		return err
	}
	k.topic = topic
	go k.handleSuccess()
	go k.handleError()
	return nil
}

func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) {
	bytes, err := proto.Marshal(message)
	if err != nil {
		return
	}

	msg := &sarama.ProducerMessage{
		Topic: k.topic,
		Key:   sarama.StringEncoder(key),
		Value: sarama.ByteEncoder(bytes),
	}

	k.producer.Input() <- msg

	return nil
}

func (k *KafkaQueue) handleSuccess() {
	for {
		pm := <-k.producer.Successes()
		if pm != nil {
			log.Tracef("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key)
		}
	}
}

func (k *KafkaQueue) handleError() {
	for {
		err := <-k.producer.Errors()
		if err != nil {
			log.Errorf("producer message error, partition:%d offset:%d key:%v value:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
		}
	}
}