diff options
Diffstat (limited to 'weed/replication/notification_kafka.go')
| -rw-r--r-- | weed/replication/notification_kafka.go | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go new file mode 100644 index 000000000..ce3b86ae9 --- /dev/null +++ b/weed/replication/notification_kafka.go @@ -0,0 +1,77 @@ +package replication + +import ( + "github.com/Shopify/sarama" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + NotificationInputs = append(NotificationInputs, &KafkaInput{ + }) +} + +type KafkaInput struct { + topic string + consumer sarama.Consumer + messageChan chan *sarama.ConsumerMessage +} + +func (k *KafkaInput) GetName() string { + return "kafka" +} + +func (k *KafkaInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) + return k.initialize( + configuration.GetStringSlice("hosts"), + configuration.GetString("topic"), + ) +} + +func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + k.consumer, err = sarama.NewConsumer(hosts, config) + k.topic = topic + k.messageChan = make(chan *sarama.ConsumerMessage, 1) + + partitions, err := k.consumer.Partitions(topic) + if err != nil { + panic(err) + } + + for _, partition := range partitions { + partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + panic(err) + } + go func() { + for { + select { + case err := <-partitionConsumer.Errors(): + fmt.Println(err) + case msg := <-partitionConsumer.Messages(): + k.messageChan <- msg + } + } + }() + } + + return nil +} + +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + msg := <-k.messageChan + + key = string(msg.Key) + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(msg.Value, message) + + return +} |
