aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sub
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
committerchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
commit0d62be44846354c3c37b857028297edd4b8df17b (patch)
treec89320a7d58351030f1b740c7267f56bf0206429 /weed/replication/sub
parentd8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff)
downloadseaweedfs-origin/changing-to-zap.tar.xz
seaweedfs-origin/changing-to-zap.zip
Diffstat (limited to 'weed/replication/sub')
-rw-r--r--weed/replication/sub/notification_aws_sqs.go8
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go26
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go18
-rw-r--r--weed/replication/sub/notification_kafka.go14
4 files changed, 33 insertions, 33 deletions
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 7fc5c3f46..0456961ae 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -8,7 +8,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
@@ -28,8 +28,8 @@ func (k *AwsSqsInput) GetName() string {
}
func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
+ log.V(3).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ log.V(3).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
@@ -110,7 +110,7 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif
})
if err != nil {
- glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err)
+ log.V(2).Infof("delete message from sqs %s: %v", k.queueUrl, err)
}
return
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 2e7640af4..bfeb0ebb8 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -6,7 +6,7 @@ package sub
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"gocloud.dev/pubsub"
@@ -41,38 +41,38 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
queueNameDLX := "DLX." + queueName
ch, err := conn.Channel()
if err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
defer ch.Close()
if err := ch.ExchangeDeclare(
exchangeNameDLX, "fanout", true, false, false, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if err := ch.ExchangeDeclare(
exchangeName, "fanout", true, false, false, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if _, err := ch.QueueDeclare(
queueName, true, false, false, false,
amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if _, err := ch.QueueDeclare(
queueNameDLX, true, false, false, false,
amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
return nil
@@ -90,7 +90,7 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
topicUrl := configuration.GetString(prefix + "topic_url")
k.subURL = configuration.GetString(prefix + "sub_url")
- glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
+ log.V(3).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
if err != nil {
return err
@@ -127,7 +127,7 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
k.sub.Shutdown(ctx)
conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err != nil {
- glog.Error(err)
+ log.Error(err)
time.Sleep(time.Second)
return
}
@@ -135,7 +135,7 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
return
}
// This is permanent cached sub err
- glog.Fatal(err)
+ log.Fatal(err)
}
onFailureFn = func() {
if msg.Nackable() {
@@ -143,11 +143,11 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
var delivery amqp.Delivery
if msg.As(&delivery) {
isRedelivered = delivery.Redelivered
- glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
+ log.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
}
if isRedelivered {
if err := delivery.Nack(false, false); err != nil {
- glog.Error(err)
+ log.Error(err)
}
} else {
msg.Nack()
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index c7509abf2..f431a2c1d 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -6,7 +6,7 @@ import (
"os"
"cloud.google.com/go/pubsub"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/api/option"
@@ -28,8 +28,8 @@ func (k *GooglePubSubInput) GetName() string {
}
func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
+ log.V(3).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ log.V(3).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"project_id"),
@@ -45,13 +45,13 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
var found bool
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
if !found {
- glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
}
}
client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials))
if err != nil {
- glog.Fatalf("Failed to create client: %v", err)
+ log.Fatalf("Failed to create client: %v", err)
}
k.topicName = topicName
@@ -60,11 +60,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
if !exists {
topic, err = client.CreateTopic(ctx, topicName)
if err != nil {
- glog.Fatalf("Failed to create topic %s: %v", topicName, err)
+ log.Fatalf("Failed to create topic %s: %v", topicName, err)
}
}
} else {
- glog.Fatalf("Failed to check topic %s: %v", topicName, err)
+ log.Fatalf("Failed to check topic %s: %v", topicName, err)
}
subscriptionName := "seaweedfs_sub"
@@ -74,11 +74,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
if !exists {
k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
- glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err)
+ log.Fatalf("Failed to create subscription %s: %v", subscriptionName, err)
}
}
} else {
- glog.Fatalf("Failed to check subscription %s: %v", topicName, err)
+ log.Fatalf("Failed to check subscription %s: %v", topicName, err)
}
k.messageChan = make(chan *pubsub.Message, 1)
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 92f7ce609..4738b5612 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/Shopify/sarama"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
@@ -29,8 +29,8 @@ func (k *KafkaInput) GetName() string {
}
func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
- glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
+ log.V(3).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ log.V(3).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"topic"),
@@ -46,7 +46,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
if err != nil {
panic(err)
} else {
- glog.V(0).Infof("connected to %v", hosts)
+ log.V(3).Infof("connected to %v", hosts)
}
k.topic = topic
@@ -87,7 +87,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
case msg := <-partitionConsumer.Messages():
k.messageChan <- msg
if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
- glog.Warningf("set kafka offset: %v", err)
+ log.Warningf("set kafka offset: %v", err)
}
}
}
@@ -121,12 +121,12 @@ func loadProgress(offsetFile string) *KafkaProgress {
progress := &KafkaProgress{}
data, err := os.ReadFile(offsetFile)
if err != nil {
- glog.Warningf("failed to read kafka progress file: %s", offsetFile)
+ log.Warningf("failed to read kafka progress file: %s", offsetFile)
return nil
}
err = json.Unmarshal(data, progress)
if err != nil {
- glog.Warningf("failed to read kafka progress message: %s", string(data))
+ log.Warningf("failed to read kafka progress message: %s", string(data))
return nil
}
return progress