diff options
Diffstat (limited to 'test/postgres/producer.go')
| -rw-r--r-- | test/postgres/producer.go | 27 |
1 files changed, 8 insertions, 19 deletions
diff --git a/test/postgres/producer.go b/test/postgres/producer.go index 20a72993f..ecaea6344 100644 --- a/test/postgres/producer.go +++ b/test/postgres/producer.go @@ -8,7 +8,6 @@ import ( "math/big" "math/rand" "os" - "strconv" "strings" "time" @@ -16,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -103,7 +103,7 @@ func main() { log.Printf("Error creating topic %s.%s: %v", topicConfig.namespace, topicConfig.topic, err) } else { - log.Printf("✓ Successfully created %s.%s", + log.Printf("-Successfully created %s.%s", topicConfig.namespace, topicConfig.topic) } @@ -111,7 +111,7 @@ func main() { time.Sleep(2 * time.Second) } - log.Println("✓ MQ test data creation completed!") + log.Println("-MQ test data creation completed!") log.Println("\nCreated namespaces:") log.Println(" - analytics (user_events, system_logs, metrics)") log.Println(" - ecommerce (product_views, user_events)") @@ -292,24 +292,12 @@ func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) { return &schema_pb.RecordValue{Fields: fields}, nil } -// convertHTTPToGRPC converts HTTP address to gRPC address -// Follows SeaweedFS convention: gRPC port = HTTP port + 10000 -func convertHTTPToGRPC(httpAddress string) string { - if strings.Contains(httpAddress, ":") { - parts := strings.Split(httpAddress, ":") - if len(parts) == 2 { - if port, err := strconv.Atoi(parts[1]); err == nil { - return fmt.Sprintf("%s:%d", parts[0], port+10000) - } - } - } - // Fallback: return original address if conversion fails - return httpAddress -} +// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this // discoverFiler finds a filer from the master server func discoverFiler(masterHTTPAddress string) (string, error) { - masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) + httpAddr := pb.ServerAddress(masterHTTPAddress) + masterGRPCAddress := httpAddr.ToGrpcAddress() conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -334,7 +322,8 @@ func discoverFiler(masterHTTPAddress string) (string, error) { // Use the first available filer and convert HTTP address to gRPC filerHTTPAddress := resp.ClusterNodes[0].Address - return convertHTTPToGRPC(filerHTTPAddress), nil + httpAddr := pb.ServerAddress(filerHTTPAddress) + return httpAddr.ToGrpcAddress(), nil } // discoverBroker finds the broker balancer using filer lock mechanism |
