aboutsummaryrefslogtreecommitdiff
path: root/weed/command/mq_kafka_gateway.go
blob: 614f03e9c85d90b5e77316bd7b410d29175ceec6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package command

import (
	"fmt"
	"net/http"
	_ "net/http/pprof"
	"os"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

var (
	mqKafkaGatewayOptions mqKafkaGatewayOpts
)

type mqKafkaGatewayOpts struct {
	ip                *string
	ipBind            *string
	port              *int
	pprofPort         *int
	master            *string
	filerGroup        *string
	schemaRegistryURL *string
	defaultPartitions *int
}

func init() {
	cmdMqKafkaGateway.Run = runMqKafkaGateway
	mqKafkaGatewayOptions.ip = cmdMqKafkaGateway.Flag.String("ip", util.DetectedHostAddress(), "Kafka gateway advertised host address")
	mqKafkaGatewayOptions.ipBind = cmdMqKafkaGateway.Flag.String("ip.bind", "", "Kafka gateway bind address (default: same as -ip)")
	mqKafkaGatewayOptions.port = cmdMqKafkaGateway.Flag.Int("port", 9092, "Kafka gateway listen port")
	mqKafkaGatewayOptions.pprofPort = cmdMqKafkaGateway.Flag.Int("port.pprof", 0, "HTTP profiling port (0 to disable)")
	mqKafkaGatewayOptions.master = cmdMqKafkaGateway.Flag.String("master", "localhost:9333", "comma-separated SeaweedFS master servers")
	mqKafkaGatewayOptions.filerGroup = cmdMqKafkaGateway.Flag.String("filerGroup", "", "filer group name")
	mqKafkaGatewayOptions.schemaRegistryURL = cmdMqKafkaGateway.Flag.String("schema-registry-url", "", "Schema Registry URL (required for schema management)")
	mqKafkaGatewayOptions.defaultPartitions = cmdMqKafkaGateway.Flag.Int("default-partitions", 4, "Default number of partitions for auto-created topics")
}

var cmdMqKafkaGateway = &Command{
	UsageLine: "mq.kafka.gateway [-ip=<host>] [-ip.bind=<bind_addr>] [-port=9092] [-master=<master_servers>] [-filerGroup=<group>] [-default-partitions=4] -schema-registry-url=<url>",
	Short:     "start a Kafka wire-protocol gateway for SeaweedMQ with schema management",
	Long: `Start a Kafka wire-protocol gateway translating Kafka client requests to SeaweedMQ.

Connects to SeaweedFS master servers to discover available brokers and integrates with
Schema Registry for schema-aware topic management.

Options:
  -ip                  Advertised host address that clients should connect to (default: auto-detected)
  -ip.bind             Bind address for the gateway to listen on (default: same as -ip)
                       Use 0.0.0.0 to bind to all interfaces while advertising specific IP
  -port                Listen port (default: 9092)
  -default-partitions  Default number of partitions for auto-created topics (default: 4)
  -schema-registry-url Schema Registry URL (REQUIRED for schema management)

Examples:
  weed mq.kafka.gateway -port=9092 -master=localhost:9333 -schema-registry-url=http://localhost:8081
  weed mq.kafka.gateway -ip=gateway1 -port=9092 -master=master1:9333,master2:9333 -schema-registry-url=http://schema-registry:8081
  weed mq.kafka.gateway -ip=external.host.com -ip.bind=0.0.0.0 -master=localhost:9333 -schema-registry-url=http://schema-registry:8081

This is experimental and currently supports a minimal subset for development.
`,
}

func runMqKafkaGateway(cmd *Command, args []string) bool {
	// Validate required options
	if *mqKafkaGatewayOptions.master == "" {
		glog.Fatalf("SeaweedFS master address is required (-master)")
		return false
	}

	// Schema Registry URL is required for schema management
	if *mqKafkaGatewayOptions.schemaRegistryURL == "" {
		glog.Fatalf("Schema Registry URL is required (-schema-registry-url)")
		return false
	}

	// Determine bind address - default to advertised IP if not specified
	bindIP := *mqKafkaGatewayOptions.ipBind
	if bindIP == "" {
		bindIP = *mqKafkaGatewayOptions.ip
	}

	// Construct listen address from bind IP and port
	listenAddr := fmt.Sprintf("%s:%d", bindIP, *mqKafkaGatewayOptions.port)

	// Set advertised host for Kafka protocol handler
	if err := os.Setenv("KAFKA_ADVERTISED_HOST", *mqKafkaGatewayOptions.ip); err != nil {
		glog.Warningf("Failed to set KAFKA_ADVERTISED_HOST environment variable: %v", err)
	}

	srv := gateway.NewServer(gateway.Options{
		Listen:            listenAddr,
		Masters:           *mqKafkaGatewayOptions.master,
		FilerGroup:        *mqKafkaGatewayOptions.filerGroup,
		SchemaRegistryURL: *mqKafkaGatewayOptions.schemaRegistryURL,
		DefaultPartitions: int32(*mqKafkaGatewayOptions.defaultPartitions),
	})

	glog.Warningf("EXPERIMENTAL FEATURE: MQ Kafka Gateway is experimental and should NOT be used in production environments. It currently supports only a minimal subset of Kafka protocol for development purposes.")

	// Show bind vs advertised addresses for clarity
	if bindIP != *mqKafkaGatewayOptions.ip {
		glog.V(0).Infof("Starting MQ Kafka Gateway: binding to %s, advertising %s:%d to clients",
			listenAddr, *mqKafkaGatewayOptions.ip, *mqKafkaGatewayOptions.port)
	} else {
		glog.V(0).Infof("Starting MQ Kafka Gateway on %s", listenAddr)
	}
	glog.V(0).Infof("Using SeaweedMQ brokers from masters: %s", *mqKafkaGatewayOptions.master)

	// Start HTTP profiling server if enabled
	if *mqKafkaGatewayOptions.pprofPort > 0 {
		go func() {
			pprofAddr := fmt.Sprintf(":%d", *mqKafkaGatewayOptions.pprofPort)
			glog.V(0).Infof("Kafka Gateway pprof server listening on %s", pprofAddr)
			glog.V(0).Infof("Access profiling at: http://localhost:%d/debug/pprof/", *mqKafkaGatewayOptions.pprofPort)
			if err := http.ListenAndServe(pprofAddr, nil); err != nil {
				glog.Errorf("pprof server error: %v", err)
			}
		}()
	}

	if err := srv.Start(); err != nil {
		glog.Fatalf("mq kafka gateway start: %v", err)
		return false
	}

	// Set up graceful shutdown
	defer func() {
		glog.V(0).Infof("Shutting down MQ Kafka Gateway...")
		if err := srv.Close(); err != nil {
			glog.Errorf("mq kafka gateway close: %v", err)
		}
	}()

	// Serve blocks until closed
	if err := srv.Wait(); err != nil {
		glog.Errorf("mq kafka gateway wait: %v", err)
		return false
	}
	return true
}