diff options
| author | chrislu <chris.lu@gmail.com> | 2025-05-22 09:54:31 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-05-22 09:54:31 -0700 |
| commit | 0d62be44846354c3c37b857028297edd4b8df17b (patch) | |
| tree | c89320a7d58351030f1b740c7267f56bf0206429 /weed/replication/sub | |
| parent | d8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff) | |
| download | seaweedfs-origin/changing-to-zap.tar.xz seaweedfs-origin/changing-to-zap.zip | |
Diffstat (limited to 'weed/replication/sub')
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 8 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 26 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 18 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 14 |
4 files changed, 33 insertions, 33 deletions
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index 7fc5c3f46..0456961ae 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -8,7 +8,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" @@ -28,8 +28,8 @@ func (k *AwsSqsInput) GetName() string { } func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) - glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) + log.V(3).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + log.V(3).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), @@ -110,7 +110,7 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif }) if err != nil { - glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err) + log.V(2).Infof("delete message from sqs %s: %v", k.queueUrl, err) } return diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 2e7640af4..bfeb0ebb8 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -6,7 +6,7 @@ package sub import ( "context" amqp "github.com/rabbitmq/amqp091-go" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "gocloud.dev/pubsub" @@ -41,38 +41,38 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str queueNameDLX := "DLX." + queueName ch, err := conn.Channel() if err != nil { - glog.Error(err) + log.Error(err) return err } defer ch.Close() if err := ch.ExchangeDeclare( exchangeNameDLX, "fanout", true, false, false, false, nil); err != nil { - glog.Error(err) + log.Error(err) return err } if err := ch.ExchangeDeclare( exchangeName, "fanout", true, false, false, false, nil); err != nil { - glog.Error(err) + log.Error(err) return err } if _, err := ch.QueueDeclare( queueName, true, false, false, false, amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil { - glog.Error(err) + log.Error(err) return err } if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil { - glog.Error(err) + log.Error(err) return err } if _, err := ch.QueueDeclare( queueNameDLX, true, false, false, false, amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil { - glog.Error(err) + log.Error(err) return err } if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil { - glog.Error(err) + log.Error(err) return err } return nil @@ -90,7 +90,7 @@ func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { topicUrl := configuration.GetString(prefix + "topic_url") k.subURL = configuration.GetString(prefix + "sub_url") - glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL) + log.V(3).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL) sub, err := pubsub.OpenSubscription(context.Background(), k.subURL) if err != nil { return err @@ -127,7 +127,7 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event k.sub.Shutdown(ctx) conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL")) if err != nil { - glog.Error(err) + log.Error(err) time.Sleep(time.Second) return } @@ -135,7 +135,7 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event return } // This is permanent cached sub err - glog.Fatal(err) + log.Fatal(err) } onFailureFn = func() { if msg.Nackable() { @@ -143,11 +143,11 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event var delivery amqp.Delivery if msg.As(&delivery) { isRedelivered = delivery.Redelivered - glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered) + log.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered) } if isRedelivered { if err := delivery.Nack(false, false); err != nil { - glog.Error(err) + log.Error(err) } } else { msg.Nack() diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index c7509abf2..f431a2c1d 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -6,7 +6,7 @@ import ( "os" "cloud.google.com/go/pubsub" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/api/option" @@ -28,8 +28,8 @@ func (k *GooglePubSubInput) GetName() string { } func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) + log.V(3).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + log.V(3).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( configuration.GetString(prefix+"google_application_credentials"), configuration.GetString(prefix+"project_id"), @@ -45,13 +45,13 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") + log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") } } client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) if err != nil { - glog.Fatalf("Failed to create client: %v", err) + log.Fatalf("Failed to create client: %v", err) } k.topicName = topicName @@ -60,11 +60,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId if !exists { topic, err = client.CreateTopic(ctx, topicName) if err != nil { - glog.Fatalf("Failed to create topic %s: %v", topicName, err) + log.Fatalf("Failed to create topic %s: %v", topicName, err) } } } else { - glog.Fatalf("Failed to check topic %s: %v", topicName, err) + log.Fatalf("Failed to check topic %s: %v", topicName, err) } subscriptionName := "seaweedfs_sub" @@ -74,11 +74,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId if !exists { k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic}) if err != nil { - glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err) + log.Fatalf("Failed to create subscription %s: %v", subscriptionName, err) } } } else { - glog.Fatalf("Failed to check subscription %s: %v", topicName, err) + log.Fatalf("Failed to check subscription %s: %v", topicName, err) } k.messageChan = make(chan *pubsub.Message, 1) diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index 92f7ce609..4738b5612 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -8,7 +8,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" @@ -29,8 +29,8 @@ func (k *KafkaInput) GetName() string { } func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) - glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) + log.V(3).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + log.V(3).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"topic"), @@ -46,7 +46,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, if err != nil { panic(err) } else { - glog.V(0).Infof("connected to %v", hosts) + log.V(3).Infof("connected to %v", hosts) } k.topic = topic @@ -87,7 +87,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, case msg := <-partitionConsumer.Messages(): k.messageChan <- msg if err := progress.setOffset(msg.Partition, msg.Offset); err != nil { - glog.Warningf("set kafka offset: %v", err) + log.Warningf("set kafka offset: %v", err) } } } @@ -121,12 +121,12 @@ func loadProgress(offsetFile string) *KafkaProgress { progress := &KafkaProgress{} data, err := os.ReadFile(offsetFile) if err != nil { - glog.Warningf("failed to read kafka progress file: %s", offsetFile) + log.Warningf("failed to read kafka progress file: %s", offsetFile) return nil } err = json.Unmarshal(data, progress) if err != nil { - glog.Warningf("failed to read kafka progress message: %s", string(data)) + log.Warningf("failed to read kafka progress message: %s", string(data)) return nil } return progress |
