diff options
Diffstat (limited to 'weed/msgqueue')
| -rw-r--r-- | weed/msgqueue/configuration.go | 82 | ||||
| -rw-r--r-- | weed/msgqueue/kafka/kafka_queue.go | 76 | ||||
| -rw-r--r-- | weed/msgqueue/log/log_queue.go | 29 | ||||
| -rw-r--r-- | weed/msgqueue/message_queue.go | 11 |
4 files changed, 198 insertions, 0 deletions
diff --git a/weed/msgqueue/configuration.go b/weed/msgqueue/configuration.go new file mode 100644 index 000000000..63f103c1e --- /dev/null +++ b/weed/msgqueue/configuration.go @@ -0,0 +1,82 @@ +package msgqueue + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/spf13/viper" +) + +const ( + MSG_QUEUE_TOML_EXAMPLE = ` +# A sample TOML config file for SeaweedFS message queue + +[log] +enabled = true + +[kafka] +enabled = false +hosts = [ + "localhost:9092" +] +topic = "seaweedfs_filer" + +` +) + +var ( + MessageQueues []MessageQueue + + Queue MessageQueue +) + +func LoadConfiguration() { + + // find a filer store + viper.SetConfigName("message_queue") // name of config file (without extension) + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths + viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in + if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file + glog.Fatalf("Failed to load message_queue.toml file from current directory, or $HOME/.seaweedfs/, "+ + "or /etc/seaweedfs/"+ + "\n\nPlease follow this example and add a message_queue.toml file to "+ + "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+MSG_QUEUE_TOML_EXAMPLE, err) + } + + glog.V(0).Infof("Reading message queue configuration from %s", viper.ConfigFileUsed()) + for _, store := range MessageQueues { + if viper.GetBool(store.GetName() + ".enabled") { + viperSub := viper.Sub(store.GetName()) + if err := store.Initialize(viperSub); err != nil { + glog.Fatalf("Failed to initialize store for %s: %+v", + store.GetName(), err) + } + Queue = store + glog.V(0).Infof("Configure message queue for %s from %s", store.GetName(), viper.ConfigFileUsed()) + return + } + } + + println() + println("Supported message queues are:") + for _, store := range MessageQueues { + println(" " + store.GetName()) + } + + println() + println("Please configure a supported message queue in", viper.ConfigFileUsed()) + println() + + os.Exit(-1) +} + +// A simplified interface to decouple from Viper +type Configuration interface { + GetString(key string) string + GetBool(key string) bool + GetInt(key string) int + GetInt64(key string) int64 + GetFloat64(key string) float64 + GetStringSlice(key string) []string +} diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go new file mode 100644 index 000000000..c013baf5f --- /dev/null +++ b/weed/msgqueue/kafka/kafka_queue.go @@ -0,0 +1,76 @@ +package kafka + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/chrislusf/seaweedfs/weed/msgqueue" + "github.com/golang/protobuf/proto" + "github.com/Shopify/sarama" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func init() { + msgqueue.MessageQueues = append(msgqueue.MessageQueues, &KafkaQueue{}) +} + +type KafkaQueue struct { + topic string + producer sarama.AsyncProducer +} + +func (k *KafkaQueue) GetName() string { + return "kafka" +} + +func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) { + return k.initialize( + configuration.GetStringSlice("hosts"), + configuration.GetString("topic"), + ) +} + +func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) { + config := sarama.NewConfig() + config.Producer.RequiredAcks = sarama.WaitForLocal + config.Producer.Partitioner = sarama.NewHashPartitioner + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + k.producer, err = sarama.NewAsyncProducer(hosts, config) + go k.handleSuccess() + go k.handleError() + return nil +} + +func (k *KafkaQueue) SendMessage(key string, message proto.Message) (err error) { + bytes, err := proto.Marshal(message) + if err != nil { + return + } + + msg := &sarama.ProducerMessage{ + Topic: k.topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(bytes), + } + + k.producer.Input() <- msg + + return nil +} + +func (k *KafkaQueue) handleSuccess() { + for { + pm := <-k.producer.Successes() + if pm != nil { + glog.Infof("producer message success, partition:%d offset:%d key:%v valus:%s", pm.Partition, pm.Offset, pm.Key, pm.Value) + } + } +} + +func (k *KafkaQueue) handleError() { + for { + err := <-k.producer.Errors() + if err != nil { + glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v)", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err) + } + } +} diff --git a/weed/msgqueue/log/log_queue.go b/weed/msgqueue/log/log_queue.go new file mode 100644 index 000000000..ef7967e6c --- /dev/null +++ b/weed/msgqueue/log/log_queue.go @@ -0,0 +1,29 @@ +package kafka + +import ( + _ "github.com/go-sql-driver/mysql" + "github.com/chrislusf/seaweedfs/weed/msgqueue" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func init() { + msgqueue.MessageQueues = append(msgqueue.MessageQueues, &LogQueue{}) +} + +type LogQueue struct { +} + +func (k *LogQueue) GetName() string { + return "log" +} + +func (k *LogQueue) Initialize(configuration msgqueue.Configuration) (err error) { + return nil +} + +func (k *LogQueue) SendMessage(key string, message proto.Message) (err error) { + + glog.V(0).Infof("%v: %+v", key, message) + return nil +} diff --git a/weed/msgqueue/message_queue.go b/weed/msgqueue/message_queue.go new file mode 100644 index 000000000..6d57b9b3b --- /dev/null +++ b/weed/msgqueue/message_queue.go @@ -0,0 +1,11 @@ +package msgqueue + +import "github.com/golang/protobuf/proto" + +type MessageQueue interface { + // GetName gets the name to locate the configuration in message_queue.toml file + GetName() string + // Initialize initializes the file store + Initialize(configuration Configuration) error + SendMessage(key string, message proto.Message) error +} |
