aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-08-19 01:27:30 -0700
committerChris Lu <chris.lu@gmail.com>2018-08-19 01:27:30 -0700
commitee920d4f834b88d748a3f626266859cd27d8bdfd (patch)
treed34dc020333f7a6f66236d5666f843e8e0056693
parent708acee502918080b286cb96114a3789b9f1183d (diff)
downloadseaweedfs-ee920d4f834b88d748a3f626266859cd27d8bdfd.tar.xz
seaweedfs-ee920d4f834b88d748a3f626266859cd27d8bdfd.zip
kafka set the right topic
-rw-r--r--weed/filer2/filer.go2
-rw-r--r--weed/msgqueue/kafka/kafka_queue.go7
2 files changed, 6 insertions, 3 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 475e79a17..84cac5c27 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -174,7 +174,7 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet
if p == "/" {
return nil
}
- glog.V(0).Infof("deleting entry %v", p)
+ glog.V(3).Infof("deleting entry %v", p)
f.NotifyUpdateEvent(entry, nil)
diff --git a/weed/msgqueue/kafka/kafka_queue.go b/weed/msgqueue/kafka/kafka_queue.go
index 72bfd657a..f070fd597 100644
--- a/weed/msgqueue/kafka/kafka_queue.go
+++ b/weed/msgqueue/kafka/kafka_queue.go
@@ -21,6 +21,8 @@ func (k *KafkaQueue) GetName() string {
}
func (k *KafkaQueue) Initialize(configuration msgqueue.Configuration) (err error) {
+ glog.V(0).Infof("filer.msgqueue.kafka.hosts: %v\n", configuration.GetStringSlice("hosts"))
+ glog.V(0).Infof("filer.msgqueue.kafka.topic: %v\n", configuration.GetString("topic"))
return k.initialize(
configuration.GetStringSlice("hosts"),
configuration.GetString("topic"),
@@ -34,6 +36,7 @@ func (k *KafkaQueue) initialize(hosts []string, topic string) (err error) {
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
k.producer, err = sarama.NewAsyncProducer(hosts, config)
+ k.topic = topic
go k.handleSuccess()
go k.handleError()
return nil
@@ -60,7 +63,7 @@ func (k *KafkaQueue) handleSuccess() {
for {
pm := <-k.producer.Successes()
if pm != nil {
- glog.Infof("producer message success, partition:%d offset:%d key:%v valus:%s", pm.Partition, pm.Offset, pm.Key, pm.Value)
+ glog.V(3).Infof("producer message success, partition:%d offset:%d key:%v", pm.Partition, pm.Offset, pm.Key)
}
}
}
@@ -69,7 +72,7 @@ func (k *KafkaQueue) handleError() {
for {
err := <-k.producer.Errors()
if err != nil {
- glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v)", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err)
+ glog.Errorf("producer message error, partition:%d offset:%d key:%v valus:%s error(%v) topic:%s", err.Msg.Partition, err.Msg.Offset, err.Msg.Key, err.Msg.Value, err.Err, k.topic)
}
}
}