aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sub/notification_kafka.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
committerchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
commit0d62be44846354c3c37b857028297edd4b8df17b (patch)
treec89320a7d58351030f1b740c7267f56bf0206429 /weed/replication/sub/notification_kafka.go
parentd8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff)
downloadseaweedfs-origin/changing-to-zap.tar.xz
seaweedfs-origin/changing-to-zap.zip
Diffstat (limited to 'weed/replication/sub/notification_kafka.go')
-rw-r--r--weed/replication/sub/notification_kafka.go14
1 files changed, 7 insertions, 7 deletions
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 92f7ce609..4738b5612 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/Shopify/sarama"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
@@ -29,8 +29,8 @@ func (k *KafkaInput) GetName() string {
}
func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
- glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
+ log.V(3).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ log.V(3).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"topic"),
@@ -46,7 +46,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
if err != nil {
panic(err)
} else {
- glog.V(0).Infof("connected to %v", hosts)
+ log.V(3).Infof("connected to %v", hosts)
}
k.topic = topic
@@ -87,7 +87,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
case msg := <-partitionConsumer.Messages():
k.messageChan <- msg
if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
- glog.Warningf("set kafka offset: %v", err)
+ log.Warningf("set kafka offset: %v", err)
}
}
}
@@ -121,12 +121,12 @@ func loadProgress(offsetFile string) *KafkaProgress {
progress := &KafkaProgress{}
data, err := os.ReadFile(offsetFile)
if err != nil {
- glog.Warningf("failed to read kafka progress file: %s", offsetFile)
+ log.Warningf("failed to read kafka progress file: %s", offsetFile)
return nil
}
err = json.Unmarshal(data, progress)
if err != nil {
- glog.Warningf("failed to read kafka progress message: %s", string(data))
+ log.Warningf("failed to read kafka progress message: %s", string(data))
return nil
}
return progress