diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-16 01:18:30 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-16 01:18:30 -0700 |
| commit | d923ba2206a8238977e740f98f061242d61ed5e6 (patch) | |
| tree | a2fcaf40d5f2398b2b8ad3c2a77a22579597cc1c /weed/msgqueue/kafka/kafka_queue.go | |
| parent | bea4f6ca14615de23ba81a84cf61ba0f2a28b7f6 (diff) | |
| download | seaweedfs-d923ba2206a8238977e740f98f061242d61ed5e6.tar.xz seaweedfs-d923ba2206a8238977e740f98f061242d61ed5e6.zip | |
renaming msgqueue to notification
Diffstat (limited to 'weed/msgqueue/kafka/kafka_queue.go')
| -rw-r--r-- | weed/msgqueue/kafka/kafka_queue.go | 79 |
1 files changed, 0 insertions, 79 deletions
diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go deleted file mode 100644 index f373395f4..000000000 --- a/weed/msgqueue/kafka/kafka_queue.go +++ /dev/null @@ -1,79 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/msgqueue" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" -) - -func init() { - msgqueue.MessageQueues = append(msgqueue.MessageQueues, &KafkaQueue{}) -} - -type KafkaQueue struct { - topic string - producer sarama.AsyncProducer -} - -func (k *KafkaQueue) GetName() string { - return "kafka" -} - -func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { - glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) - glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic")) - return k.initialize( - configuration.GetStringSlice("hosts"), - configuration.GetString("topic"), - ) -} - -func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) { - config := sarama.NewConfig() - config.Producer.RequiredAcks = sarama.WaitForLocal - config.Producer.Partitioner = sarama.NewHashPartitioner - config.Producer.Return.Successes = true - config.Producer.Return.Errors = true - k.producer, err = sarama.NewAsyncProducer(hosts, config) - k.topic = topic - go k.handleSuccess() - go k.handleError() - return nil -} - -func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) { - bytes, err := proto.Marshal(message) - if err != nil { - return - } - - msg := &sarama.ProducerMessage{ - Topic: k.topic, - Key: sarama.StringEncoder(key), - Value: sarama.ByteEncoder(bytes), - } - - k.producer.Input() <- msg - - return nil -} - -func (k *KafkaQueue) handleSuccess() { - for { - pm := <-k.producer.Successes() - if pm != nil { - glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key) - } - } -} - -func (k *KafkaQueue) handleError() { - for { - err := <-k.producer.Errors() - if err != nil { - glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic) - } - } -} |
