aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/command.go2
-rw-r--r--weed/command/filer.go3
-rw-r--r--weed/command/filer_copy.go20
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/mount.go21
-rw-r--r--weed/command/mount_std.go7
-rw-r--r--weed/command/msg_broker.go111
-rw-r--r--weed/command/queue.go107
-rw-r--r--weed/command/s3.go5
-rw-r--r--weed/command/scaffold.go2
-rw-r--r--weed/command/volume.go3
-rw-r--r--weed/command/webdav.go3
12 files changed, 134 insertions, 153 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 6687469f1..9dc51e922 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -20,7 +20,7 @@ var Commands = []*Command{
cmdS3,
cmdUpload,
cmdDownload,
- cmdQueue,
+ cmdMsgBroker,
cmdScaffold,
cmdShell,
cmdVersion,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 31e65acea..b5b595215 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -9,6 +9,7 @@ import (
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -144,7 +145,7 @@ func (fo *FilerOptions) startFiler() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 18f41048b..3e7ae1db2 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -17,6 +17,7 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -159,7 +160,7 @@ func runCopy(cmd *Command, args []string) bool {
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@@ -274,7 +275,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if task.fileSize > 0 {
// assign a volume
- err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -319,7 +320,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -375,7 +376,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
// assign a volume
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
- err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
@@ -447,7 +448,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return uploadError
}
- if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -496,12 +497,3 @@ func detectMimeType(f *os.File) string {
mimeType := http.DetectContentType(head[:n])
return mimeType
}
-
-func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return util.WithCachedGrpcClient(func(clientConn *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(clientConn)
- return fn(client)
- }, filerAddress, grpcDialOption)
-
-}
diff --git a/weed/command/master.go b/weed/command/master.go
index c4b11119b..1be60426f 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -129,7 +130,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 4bdb3415a..e73cbee10 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,11 +1,5 @@
package command
-import (
- "fmt"
- "strconv"
- "strings"
-)
-
type MountOptions struct {
filer *string
filerMountRootPath *string
@@ -69,18 +63,3 @@ var cmdMount = &Command{
`,
}
-func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
- }
-
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("filer port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
-}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index e8e3fb030..b195bf143 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -17,6 +17,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -135,16 +136,16 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
})
// parse filer grpc address
- filerGrpcAddress, err := parseFilerGrpcAddress(filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
if err != nil {
- glog.V(0).Infof("parseFilerGrpcAddress: %v", err)
+ glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
daemonize.SignalOutcome(err)
return true
}
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
new file mode 100644
index 000000000..0d69a9a66
--- /dev/null
+++ b/weed/command/msg_broker.go
@@ -0,0 +1,111 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ messageBrokerStandaloneOptions QueueOptions
+)
+
+type QueueOptions struct {
+ filer *string
+ port *int
+ tlsPrivateKey *string
+ tlsCertificate *string
+ defaultTtl *string
+}
+
+func init() {
+ cmdMsgBroker.Run = runMsgBroker // break init cycle
+ messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
+ messageBrokerStandaloneOptions.tlsPrivateKey = cmdMsgBroker.Flag.String("key.file", "", "path to the TLS private key file")
+ messageBrokerStandaloneOptions.tlsCertificate = cmdMsgBroker.Flag.String("cert.file", "", "path to the TLS certificate file")
+ messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+}
+
+var cmdMsgBroker = &Command{
+ UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
+ Short: "<WIP> start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMsgBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return messageBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
+
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ filerQueuesPath := "/queues"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerQueuesPath = resp.DirQueues
+ glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{
+ Filers: []string{*msgBrokerOpt.filer},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Port: *msgBrokerOpt.port,
+ })
+
+ // start grpc listener
+ grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}
diff --git a/weed/command/queue.go b/weed/command/queue.go
deleted file mode 100644
index d09d5d8b3..000000000
--- a/weed/command/queue.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package command
-
-import (
- "context"
- "fmt"
- "strconv"
- "time"
-
- "google.golang.org/grpc/reflection"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- queueStandaloneOptions QueueOptions
-)
-
-type QueueOptions struct {
- filer *string
- port *int
- tlsPrivateKey *string
- tlsCertificate *string
- defaultTtl *string
-}
-
-func init() {
- cmdQueue.Run = runQueue // break init cycle
- queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address")
- queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port")
- queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file")
- queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file")
- queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
-}
-
-var cmdQueue = &Command{
- UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]",
- Short: "start a queue gRPC server that is backed by a filer",
- Long: `start a queue gRPC server that is backed by a filer.
-
-`,
-}
-
-func runQueue(cmd *Command, args []string) bool {
-
- util.LoadConfiguration("security", false)
-
- return queueStandaloneOptions.startQueueServer()
-
-}
-
-func (queueopt *QueueOptions) startQueueServer() bool {
-
- filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
-
- filerQueuesPath := "/queues"
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
- for {
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
- }
- filerQueuesPath = resp.DirQueues
- glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
- return nil
- })
- if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
- time.Sleep(time.Second)
- } else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
- break
- }
- }
-
- qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{
- Filers: []string{*queueopt.filer},
- DefaultReplication: "",
- MaxMB: 0,
- Port: *queueopt.port,
- })
-
- // start grpc listener
- grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0)
- if err != nil {
- glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err)
- }
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue"))
- queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
- reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
-
- return true
-
-}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 39d0c04fc..cd4018fbc 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -6,6 +6,7 @@ import (
"net/http"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -117,7 +118,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
@@ -128,7 +129,7 @@ func (s3opt *S3Options) startS3Server() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index fc7f8636d..5b246b7c0 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -326,7 +326,7 @@ key = ""
cert = ""
key = ""
-[grpc.queue]
+[grpc.msg_broker]
cert = ""
key = ""
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 9d665d143..4773d8a55 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -13,6 +13,7 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util/httpdown"
@@ -234,7 +235,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
go func() {
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 4d5752247..ba88a17be 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -54,7 +55,7 @@ func runWebDav(cmd *Command, args []string) bool {
func (wo *WebDavOption) startWebDav() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
if err != nil {
glog.Fatal(err)
return false