diff options
| author | HongyanShen <763987993@qq.com> | 2020-03-11 12:55:24 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-11 12:55:24 +0800 |
| commit | 03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch) | |
| tree | ed8833386a712c850dcef0815509774681a6ab56 /weed/command/msg_broker.go | |
| parent | 0fca1ae776783b37481549df40f477b7d9248d3c (diff) | |
| parent | 60f5f05c78a2918d5219c925cea5847759281a2c (diff) | |
| download | seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip | |
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/command/msg_broker.go')
| -rw-r--r-- | weed/command/msg_broker.go | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go new file mode 100644 index 000000000..3e13b4730 --- /dev/null +++ b/weed/command/msg_broker.go @@ -0,0 +1,107 @@ +package command + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/grpc/reflection" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" + "github.com/chrislusf/seaweedfs/weed/security" + weed_server "github.com/chrislusf/seaweedfs/weed/server" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + messageBrokerStandaloneOptions QueueOptions +) + +type QueueOptions struct { + filer *string + port *int + defaultTtl *string +} + +func init() { + cmdMsgBroker.Run = runMsgBroker // break init cycle + messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") + messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port") + messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") +} + +var cmdMsgBroker = &Command{ + UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]", + Short: "<WIP> start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMsgBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + return messageBrokerStandaloneOptions.startQueueServer() + +} + +func (msgBrokerOpt *QueueOptions) startQueueServer() bool { + + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) + if err != nil { + glog.Fatal(err) + return false + } + + filerQueuesPath := "/queues" + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + filerQueuesPath = resp.DirQueues + glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + break + } + } + + qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ + Filers: []string{*msgBrokerOpt.filer}, + DefaultReplication: "", + MaxMB: 0, + Port: *msgBrokerOpt.port, + }) + + // start grpc listener + grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + queue_pb.RegisterSeaweedQueueServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} |
