aboutsummaryrefslogtreecommitdiff
path: root/test/postgres/producer.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/postgres/producer.go')
-rw-r--r--test/postgres/producer.go27
1 files changed, 8 insertions, 19 deletions
diff --git a/test/postgres/producer.go b/test/postgres/producer.go
index 20a72993f..ecaea6344 100644
--- a/test/postgres/producer.go
+++ b/test/postgres/producer.go
@@ -8,7 +8,6 @@ import (
"math/big"
"math/rand"
"os"
- "strconv"
"strings"
"time"
@@ -16,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -103,7 +103,7 @@ func main() {
log.Printf("Error creating topic %s.%s: %v",
topicConfig.namespace, topicConfig.topic, err)
} else {
- log.Printf("✓ Successfully created %s.%s",
+ log.Printf("-Successfully created %s.%s",
topicConfig.namespace, topicConfig.topic)
}
@@ -111,7 +111,7 @@ func main() {
time.Sleep(2 * time.Second)
}
- log.Println("✓ MQ test data creation completed!")
+ log.Println("-MQ test data creation completed!")
log.Println("\nCreated namespaces:")
log.Println(" - analytics (user_events, system_logs, metrics)")
log.Println(" - ecommerce (product_views, user_events)")
@@ -292,24 +292,12 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
return &schema_pb.RecordValue{Fields: fields}, nil
}
-// convertHTTPToGRPC converts HTTP address to gRPC address
-// Follows SeaweedFS convention: gRPC port = HTTP port + 10000
-func convertHTTPToGRPC(httpAddress string) string {
- if strings.Contains(httpAddress, ":") {
- parts := strings.Split(httpAddress, ":")
- if len(parts) == 2 {
- if port, err := strconv.Atoi(parts[1]); err == nil {
- return fmt.Sprintf("%s:%d", parts[0], port+10000)
- }
- }
- }
- // Fallback: return original address if conversion fails
- return httpAddress
-}
+// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this
// discoverFiler finds a filer from the master server
func discoverFiler(masterHTTPAddress string) (string, error) {
- masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress)
+ httpAddr := pb.ServerAddress(masterHTTPAddress)
+ masterGRPCAddress := httpAddr.ToGrpcAddress()
conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
@@ -334,7 +322,8 @@ func discoverFiler(masterHTTPAddress string) (string, error) {
// Use the first available filer and convert HTTP address to gRPC
filerHTTPAddress := resp.ClusterNodes[0].Address
- return convertHTTPToGRPC(filerHTTPAddress), nil
+ httpAddr := pb.ServerAddress(filerHTTPAddress)
+ return httpAddr.ToGrpcAddress(), nil
}
// discoverBroker finds the broker balancer using filer lock mechanism