diff options
Diffstat (limited to 'weed/msgqueue')
| -rw-r--r-- | weed/msgqueue/configuration.go | 61 | ||||
| -rw-r--r-- | weed/msgqueue/kafka/kafka_queue.go | 3 | ||||
| -rw-r--r-- | weed/msgqueue/log/log_queue.go | 3 | ||||
| -rw-r--r-- | weed/msgqueue/message_queue.go | 7 |
4 files changed, 13 insertions, 61 deletions
diff --git a/weed/msgqueue/configuration.go b/weed/msgqueue/configuration.go index 525809d73..d053f892f 100644 --- a/weed/msgqueue/configuration.go +++ b/weed/msgqueue/configuration.go @@ -1,82 +1,29 @@ package msgqueue import ( - "os" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" ) -const ( - MSG_QUEUE_TOML_EXAMPLE = ` -# A sample TOML config file for SeaweedFS message queue - -[log] -enabled = true - -[kafka] -enabled = false -hosts = [ - "localhost:9092" -] -topic = "seaweedfs_filer" - -` -) - var ( MessageQueues []MessageQueue Queue MessageQueue ) -func LoadConfiguration() { - - // find a filer store - viper.SetConfigName("message_queue") // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in - if err := viper.ReadInConfig(); err != nil { // Handle errors reading the config file - glog.Fatalf("Failed to load message_queue.toml file from current directory, or $HOME/.seaweedfs/, "+ - "or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a message_queue.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+MSG_QUEUE_TOML_EXAMPLE) - } +func LoadConfiguration(config *viper.Viper) { - glog.V(0).Infof("Reading message queue configuration from %s", viper.ConfigFileUsed()) for _, store := range MessageQueues { - if viper.GetBool(store.GetName() + ".enabled") { - viperSub := viper.Sub(store.GetName()) + if config.GetBool(store.GetName() + ".enabled") { + viperSub := config.Sub(store.GetName()) if err := store.Initialize(viperSub); err != nil { glog.Fatalf("Failed to initialize store for %s: %+v", store.GetName(), err) } Queue = store - glog.V(0).Infof("Configure message queue for %s from %s", store.GetName(), viper.ConfigFileUsed()) + glog.V(0).Infof("Configure message queue for %s", store.GetName()) return } } - println() - println("Supported message queues are:") - for _, store := range MessageQueues { - println(" " + store.GetName()) - } - - println() - println("Please configure a supported message queue in", viper.ConfigFileUsed()) - println() - - os.Exit(-1) -} - -// A simplified interface to decouple from Viper -type Configuration interface { - GetString(key string) string - GetBool(key string) bool - GetInt(key string) int - GetInt64(key string) int64 - GetFloat64(key string) float64 - GetStringSlice(key string) []string } diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go index f070fd597..7f0273ad9 100644 --- a/weed/msgqueue/kafka/kafka_queue.go +++ b/weed/msgqueue/kafka/kafka_queue.go @@ -5,6 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/msgqueue" "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -20,7 +21,7 @@ func (k *KafkaQueue) GetName() string { return "kafka" } -func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) { +func (k *KafkaQueue) Initialize(configuration util.Configuration) (err error) { glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts")) glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic")) return k.initialize( diff --git a/weed/msgqueue/log/log_queue.go b/weed/msgqueue/log/log_queue.go index 9ce9ff8be..612212ae0 100644 --- a/weed/msgqueue/log/log_queue.go +++ b/weed/msgqueue/log/log_queue.go @@ -4,6 +4,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/msgqueue" "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -17,7 +18,7 @@ func (k *LogQueue) GetName() string { return "log" } -func (k *LogQueue) Initialize(configuration msgqueue.Configuration) (err error) { +func (k *LogQueue) Initialize(configuration util.Configuration) (err error) { return nil } diff --git a/weed/msgqueue/message_queue.go b/weed/msgqueue/message_queue.go index 6d57b9b3b..a14d9b480 100644 --- a/weed/msgqueue/message_queue.go +++ b/weed/msgqueue/message_queue.go @@ -1,11 +1,14 @@ package msgqueue -import "github.com/golang/protobuf/proto" +import ( + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/util" +) type MessageQueue interface { // GetName gets the name to locate the configuration in message_queue.toml file GetName() string // Initialize initializes the file store - Initialize(configuration Configuration) error + Initialize(configuration util.Configuration) error SendMessage(key string, message proto.Message) error } |
