diff options
Diffstat (limited to 'weed/command/mq_broker.go')
| -rw-r--r-- | weed/command/mq_broker.go | 39 |
1 files changed, 30 insertions, 9 deletions
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go index ac7deac2c..8ea7f96a4 100644 --- a/weed/command/mq_broker.go +++ b/weed/command/mq_broker.go @@ -1,6 +1,10 @@ package command import ( + "fmt" + "net/http" + _ "net/http/pprof" + "google.golang.org/grpc/reflection" "github.com/seaweedfs/seaweedfs/weed/util/grace" @@ -18,15 +22,17 @@ var ( ) type MessageQueueBrokerOptions struct { - masters map[string]pb.ServerAddress - mastersString *string - filerGroup *string - ip *string - port *int - dataCenter *string - rack *string - cpuprofile *string - memprofile *string + masters map[string]pb.ServerAddress + mastersString *string + filerGroup *string + ip *string + port *int + pprofPort *int + dataCenter *string + rack *string + cpuprofile *string + memprofile *string + logFlushInterval *int } func init() { @@ -35,10 +41,12 @@ func init() { mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") + mqBrokerStandaloneOptions.pprofPort = cmdMqBroker.Flag.Int("port.pprof", 0, "HTTP profiling port (0 to disable)") mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center") mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack") mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") + mqBrokerStandaloneOptions.logFlushInterval = cmdMqBroker.Flag.Int("logFlushInterval", 5, "log buffer flush interval in seconds") } var cmdMqBroker = &Command{ @@ -77,6 +85,7 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { MaxMB: 0, Ip: *mqBrokerOpt.ip, Port: *mqBrokerOpt.port, + LogFlushInterval: *mqBrokerOpt.logFlushInterval, }, grpcDialOption) if err != nil { glog.Fatalf("failed to create new message broker for queue server: %v", err) @@ -106,6 +115,18 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { }() } + // Start HTTP profiling server if enabled + if mqBrokerOpt.pprofPort != nil && *mqBrokerOpt.pprofPort > 0 { + go func() { + pprofAddr := fmt.Sprintf(":%d", *mqBrokerOpt.pprofPort) + glog.V(0).Infof("MQ Broker pprof server listening on %s", pprofAddr) + glog.V(0).Infof("Access profiling at: http://localhost:%d/debug/pprof/", *mqBrokerOpt.pprofPort) + if err := http.ListenAndServe(pprofAddr, nil); err != nil { + glog.Errorf("pprof server error: %v", err) + } + }() + } + glog.V(0).Infof("MQ Broker listening on %s:%d", *mqBrokerOpt.ip, *mqBrokerOpt.port) grpcS.Serve(grpcL) |
