aboutsummaryrefslogtreecommitdiff
path: root/weed/command/queue.go
blob: d09d5d8b3eff2c97256b3250cfbae54b60560199 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package command

import (
	"context"
	"fmt"
	"strconv"
	"time"

	"google.golang.org/grpc/reflection"

	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
	"github.com/chrislusf/seaweedfs/weed/security"
	weed_server "github.com/chrislusf/seaweedfs/weed/server"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/util"
)

var (
	queueStandaloneOptions QueueOptions
)

type QueueOptions struct {
	filer          *string
	port           *int
	tlsPrivateKey  *string
	tlsCertificate *string
	defaultTtl     *string
}

func init() {
	cmdQueue.Run = runQueue // break init cycle
	queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address")
	queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port")
	queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file")
	queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file")
	queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
}

var cmdQueue = &Command{
	UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]",
	Short:     "start a queue gRPC server that is backed by a filer",
	Long: `start a queue gRPC server that is backed by a filer.

`,
}

func runQueue(cmd *Command, args []string) bool {

	util.LoadConfiguration("security", false)

	return queueStandaloneOptions.startQueueServer()

}

func (queueopt *QueueOptions) startQueueServer() bool {

	filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer)
	if err != nil {
		glog.Fatal(err)
		return false
	}

	filerQueuesPath := "/queues"

	grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")

	for {
		err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
			resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
			if err != nil {
				return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
			}
			filerQueuesPath = resp.DirQueues
			glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
			return nil
		})
		if err != nil {
			glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
			time.Sleep(time.Second)
		} else {
			glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
			break
		}
	}

	qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{
		Filers:             []string{*queueopt.filer},
		DefaultReplication: "",
		MaxMB:              0,
		Port:               *queueopt.port,
	})

	// start grpc listener
	grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0)
	if err != nil {
		glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err)
	}
	grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue"))
	queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
	reflection.Register(grpcS)
	go grpcS.Serve(grpcL)

	return true

}