aboutsummaryrefslogtreecommitdiff
path: root/weed/command/queue.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-02-27 00:07:13 -0800
committerChris Lu <chris.lu@gmail.com>2020-02-27 00:07:13 -0800
commit555413d9fc4837302ef1c5b2b921b406c9de6777 (patch)
treedfd0f67586df0320fd9a77288651475d2ccdcc6c /weed/command/queue.go
parent430b5a49e2abd9e3751e8c5840a690541ddc66ca (diff)
downloadseaweedfs-555413d9fc4837302ef1c5b2b921b406c9de6777.tar.xz
seaweedfs-555413d9fc4837302ef1c5b2b921b406c9de6777.zip
weed queue starts
Diffstat (limited to 'weed/command/queue.go')
-rw-r--r--weed/command/queue.go107
1 files changed, 107 insertions, 0 deletions
diff --git a/weed/command/queue.go b/weed/command/queue.go
new file mode 100644
index 000000000..d09d5d8b3
--- /dev/null
+++ b/weed/command/queue.go
@@ -0,0 +1,107 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ queueStandaloneOptions QueueOptions
+)
+
+type QueueOptions struct {
+ filer *string
+ port *int
+ tlsPrivateKey *string
+ tlsCertificate *string
+ defaultTtl *string
+}
+
+func init() {
+ cmdQueue.Run = runQueue // break init cycle
+ queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address")
+ queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port")
+ queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file")
+ queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file")
+ queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+}
+
+var cmdQueue = &Command{
+ UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]",
+ Short: "start a queue gRPC server that is backed by a filer",
+ Long: `start a queue gRPC server that is backed by a filer.
+
+`,
+}
+
+func runQueue(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return queueStandaloneOptions.startQueueServer()
+
+}
+
+func (queueopt *QueueOptions) startQueueServer() bool {
+
+ filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ filerQueuesPath := "/queues"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerQueuesPath = resp.DirQueues
+ glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{
+ Filers: []string{*queueopt.filer},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Port: *queueopt.port,
+ })
+
+ // start grpc listener
+ grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err)
+ }
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue"))
+ queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
+ reflection.Register(grpcS)
+ go grpcS.Serve(grpcL)
+
+ return true
+
+}