diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/command.go | 2 | ||||
| -rw-r--r-- | weed/command/filer.go | 3 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 20 | ||||
| -rw-r--r-- | weed/command/master.go | 3 | ||||
| -rw-r--r-- | weed/command/mount.go | 21 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 7 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 111 | ||||
| -rw-r--r-- | weed/command/queue.go | 107 | ||||
| -rw-r--r-- | weed/command/s3.go | 5 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 2 | ||||
| -rw-r--r-- | weed/command/volume.go | 3 | ||||
| -rw-r--r-- | weed/command/webdav.go | 3 |
12 files changed, 134 insertions, 153 deletions
diff --git a/weed/command/command.go b/weed/command/command.go index 6687469f1..9dc51e922 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -20,7 +20,7 @@ var Commands = []*Command{ cmdS3, cmdUpload, cmdDownload, - cmdQueue, + cmdMsgBroker, cmdScaffold, cmdShell, cmdVersion, diff --git a/weed/command/filer.go b/weed/command/filer.go index 31e65acea..b5b595215 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" @@ -144,7 +145,7 @@ func (fo *FilerOptions) startFiler() { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer")) filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) go grpcS.Serve(grpcL) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 18f41048b..3e7ae1db2 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -159,7 +160,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) { - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -274,7 +275,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if task.fileSize > 0 { // assign a volume - err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -319,7 +320,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -375,7 +376,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, // assign a volume var assignResult *filer_pb.AssignVolumeResponse var assignError error - err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -447,7 +448,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return uploadError } - if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -496,12 +497,3 @@ func detectMimeType(f *os.File) string { mimeType := http.DetectContentType(head[:n]) return mimeType } - -func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - - return util.WithCachedGrpcClient(func(clientConn *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(clientConn) - return fn(client) - }, filerAddress, grpcDialOption) - -} diff --git a/weed/command/master.go b/weed/command/master.go index c4b11119b..1be60426f 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/reflection" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" @@ -129,7 +130,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err) } // Create your protocol servers. - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master")) master_pb.RegisterSeaweedServer(grpcS, ms) protobuf.RegisterRaftServer(grpcS, raftServer) reflection.Register(grpcS) diff --git a/weed/command/mount.go b/weed/command/mount.go index 4bdb3415a..e73cbee10 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -1,11 +1,5 @@ package command -import ( - "fmt" - "strconv" - "strings" -) - type MountOptions struct { filer *string filerMountRootPath *string @@ -69,18 +63,3 @@ var cmdMount = &Command{ `, } -func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) { - hostnameAndPort := strings.Split(filer, ":") - if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort) - } - - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("filer port parse error: %v", parseErr) - } - - filerGrpcPort := int(filerPort) + 10000 - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil -} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index e8e3fb030..b195bf143 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -17,6 +17,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -135,16 +136,16 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente }) // parse filer grpc address - filerGrpcAddress, err := parseFilerGrpcAddress(filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer) if err != nil { - glog.V(0).Infof("parseFilerGrpcAddress: %v", err) + glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) daemonize.SignalOutcome(err) return true } // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { _, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go new file mode 100644 index 000000000..0d69a9a66 --- /dev/null +++ b/weed/command/msg_broker.go @@ -0,0 +1,111 @@ +package command + +import ( + "context" + "fmt" + "strconv" + "time" + + "google.golang.org/grpc/reflection" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" + "github.com/chrislusf/seaweedfs/weed/security" + weed_server "github.com/chrislusf/seaweedfs/weed/server" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + messageBrokerStandaloneOptions QueueOptions +) + +type QueueOptions struct { + filer *string + port *int + tlsPrivateKey *string + tlsCertificate *string + defaultTtl *string +} + +func init() { + cmdMsgBroker.Run = runMsgBroker // break init cycle + messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") + messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port") + messageBrokerStandaloneOptions.tlsPrivateKey = cmdMsgBroker.Flag.String("key.file", "", "path to the TLS private key file") + messageBrokerStandaloneOptions.tlsCertificate = cmdMsgBroker.Flag.String("cert.file", "", "path to the TLS certificate file") + messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") +} + +var cmdMsgBroker = &Command{ + UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]", + Short: "<WIP> start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMsgBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + return messageBrokerStandaloneOptions.startQueueServer() + +} + +func (msgBrokerOpt *QueueOptions) startQueueServer() bool { + + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer) + if err != nil { + glog.Fatal(err) + return false + } + + filerQueuesPath := "/queues" + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + for { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + filerQueuesPath = resp.DirQueues + glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + break + } + } + + qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{ + Filers: []string{*msgBrokerOpt.filer}, + DefaultReplication: "", + MaxMB: 0, + Port: *msgBrokerOpt.port, + }) + + // start grpc listener + grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + queue_pb.RegisterSeaweedQueueServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} diff --git a/weed/command/queue.go b/weed/command/queue.go deleted file mode 100644 index d09d5d8b3..000000000 --- a/weed/command/queue.go +++ /dev/null @@ -1,107 +0,0 @@ -package command - -import ( - "context" - "fmt" - "strconv" - "time" - - "google.golang.org/grpc/reflection" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" - "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - queueStandaloneOptions QueueOptions -) - -type QueueOptions struct { - filer *string - port *int - tlsPrivateKey *string - tlsCertificate *string - defaultTtl *string -} - -func init() { - cmdQueue.Run = runQueue // break init cycle - queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address") - queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port") - queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file") - queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file") - queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") -} - -var cmdQueue = &Command{ - UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]", - Short: "start a queue gRPC server that is backed by a filer", - Long: `start a queue gRPC server that is backed by a filer. - -`, -} - -func runQueue(cmd *Command, args []string) bool { - - util.LoadConfiguration("security", false) - - return queueStandaloneOptions.startQueueServer() - -} - -func (queueopt *QueueOptions) startQueueServer() bool { - - filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer) - if err != nil { - glog.Fatal(err) - return false - } - - filerQueuesPath := "/queues" - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - - for { - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) - } - filerQueuesPath = resp.DirQueues - glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath) - return nil - }) - if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress) - time.Sleep(time.Second) - } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress) - break - } - } - - qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{ - Filers: []string{*queueopt.filer}, - DefaultReplication: "", - MaxMB: 0, - Port: *queueopt.port, - }) - - // start grpc listener - grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0) - if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err) - } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue")) - queue_pb.RegisterSeaweedQueueServer(grpcS, qs) - reflection.Register(grpcS) - go grpcS.Serve(grpcL) - - return true - -} diff --git a/weed/command/s3.go b/weed/command/s3.go index 39d0c04fc..cd4018fbc 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -6,6 +6,7 @@ import ( "net/http" "time" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" @@ -117,7 +118,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer) if err != nil { glog.Fatal(err) return false @@ -128,7 +129,7 @@ func (s3opt *S3Options) startS3Server() bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index fc7f8636d..5b246b7c0 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -326,7 +326,7 @@ key = "" cert = "" key = "" -[grpc.queue] +[grpc.msg_broker] cert = "" key = "" diff --git a/weed/command/volume.go b/weed/command/volume.go index 9d665d143..4773d8a55 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -13,6 +13,7 @@ import ( "github.com/spf13/viper" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util/httpdown" @@ -234,7 +235,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } - grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume")) volume_server_pb.RegisterVolumeServerServer(grpcS, vs) reflection.Register(grpcS) go func() { diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 4d5752247..ba88a17be 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -8,6 +8,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" @@ -54,7 +55,7 @@ func runWebDav(cmd *Command, args []string) bool { func (wo *WebDavOption) startWebDav() bool { - filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer) + filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer) if err != nil { glog.Fatal(err) return false |
