aboutsummaryrefslogtreecommitdiff
path: root/weed/command/mq_broker.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/mq_broker.go')
-rw-r--r--weed/command/mq_broker.go39
1 files changed, 30 insertions, 9 deletions
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go
index ac7deac2c..8ea7f96a4 100644
--- a/weed/command/mq_broker.go
+++ b/weed/command/mq_broker.go
@@ -1,6 +1,10 @@
package command
import (
+ "fmt"
+ "net/http"
+ _ "net/http/pprof"
+
"google.golang.org/grpc/reflection"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
@@ -18,15 +22,17 @@ var (
)
type MessageQueueBrokerOptions struct {
- masters map[string]pb.ServerAddress
- mastersString *string
- filerGroup *string
- ip *string
- port *int
- dataCenter *string
- rack *string
- cpuprofile *string
- memprofile *string
+ masters map[string]pb.ServerAddress
+ mastersString *string
+ filerGroup *string
+ ip *string
+ port *int
+ pprofPort *int
+ dataCenter *string
+ rack *string
+ cpuprofile *string
+ memprofile *string
+ logFlushInterval *int
}
func init() {
@@ -35,10 +41,12 @@ func init() {
mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
+ mqBrokerStandaloneOptions.pprofPort = cmdMqBroker.Flag.Int("port.pprof", 0, "HTTP profiling port (0 to disable)")
mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack")
mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
+ mqBrokerStandaloneOptions.logFlushInterval = cmdMqBroker.Flag.Int("logFlushInterval", 5, "log buffer flush interval in seconds")
}
var cmdMqBroker = &Command{
@@ -77,6 +85,7 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
MaxMB: 0,
Ip: *mqBrokerOpt.ip,
Port: *mqBrokerOpt.port,
+ LogFlushInterval: *mqBrokerOpt.logFlushInterval,
}, grpcDialOption)
if err != nil {
glog.Fatalf("failed to create new message broker for queue server: %v", err)
@@ -106,6 +115,18 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
}()
}
+ // Start HTTP profiling server if enabled
+ if mqBrokerOpt.pprofPort != nil && *mqBrokerOpt.pprofPort > 0 {
+ go func() {
+ pprofAddr := fmt.Sprintf(":%d", *mqBrokerOpt.pprofPort)
+ glog.V(0).Infof("MQ Broker pprof server listening on %s", pprofAddr)
+ glog.V(0).Infof("Access profiling at: http://localhost:%d/debug/pprof/", *mqBrokerOpt.pprofPort)
+ if err := http.ListenAndServe(pprofAddr, nil); err != nil {
+ glog.Errorf("pprof server error: %v", err)
+ }
+ }()
+ }
+
glog.V(0).Infof("MQ Broker listening on %s:%d", *mqBrokerOpt.ip, *mqBrokerOpt.port)
grpcS.Serve(grpcL)