diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-17 00:27:56 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-17 00:27:56 -0700 |
| commit | 788acdf5275dfb7610afe9144c17d9128d1737f6 (patch) | |
| tree | 55e6701210157981eecd8a4f72d8c9eefa01c457 /weed/replication/notification_kafka.go | |
| parent | 865a0179369601e496d385e47bdbda93dcc3f243 (diff) | |
| download | seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.tar.xz seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.zip | |
add WIP filer.replicate
Diffstat (limited to 'weed/replication/notification_kafka.go')
| -rw-r--r-- | weed/replication/notification_kafka.go | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/weed/replication/notification_kafka.go b/weed/replication/notification_kafka.go new file mode 100644 index 000000000..ce3b86ae9 --- /dev/null +++ b/weed/replication/notification_kafka.go @@ -0,0 +1,77 @@ +package replication + +import ( + "github.com/Shopify/sarama" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + NotificationInputs = append(NotificationInputs, &KafkaInput{ + }) +} + +type KafkaInput struct { + topic string + consumer sarama.Consumer + messageChan chan *sarama.ConsumerMessage +} + +func (k *KafkaInput) GetName() string { + return "kafka" +} + +func (k *KafkaInput) Initialize(configuration util.Configuration) error { + glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) + glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString("topic")) + return k.initialize( + configuration.GetStringSlice("hosts"), + configuration.GetString("topic"), + ) +} + +func (k *KafkaInput) initialize(hosts []string, topic string) (err error) { + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + k.consumer, err = sarama.NewConsumer(hosts, config) + k.topic = topic + k.messageChan = make(chan *sarama.ConsumerMessage, 1) + + partitions, err := k.consumer.Partitions(topic) + if err != nil { + panic(err) + } + + for _, partition := range partitions { + partitionConsumer, err := k.consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) + if err != nil { + panic(err) + } + go func() { + for { + select { + case err := <-partitionConsumer.Errors(): + fmt.Println(err) + case msg := <-partitionConsumer.Messages(): + k.messageChan <- msg + } + } + }() + } + + return nil +} + +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { + + msg := <-k.messageChan + + key = string(msg.Key) + message = &filer_pb.EventNotification{} + err = proto.Unmarshal(msg.Value, message) + + return +} |
