aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-04 00:39:47 -0800
committerChris Lu <chris.lu@gmail.com>2020-03-04 00:39:47 -0800
commitf90c43635d96cace1ab1ca965a56a082f880aa4b (patch)
tree579d389f3dad44b4784b1dcf1a3210d82d4f091a
parentbd5c0a13e8c6456ed0b8f586726c4df4967d963a (diff)
downloadseaweedfs-f90c43635d96cace1ab1ca965a56a082f880aa4b.tar.xz
seaweedfs-f90c43635d96cace1ab1ca965a56a082f880aa4b.zip
refactoring
-rw-r--r--weed/command/command.go2
-rw-r--r--weed/command/filer.go3
-rw-r--r--weed/command/filer_copy.go20
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/mount.go21
-rw-r--r--weed/command/mount_std.go7
-rw-r--r--weed/command/msg_broker.go111
-rw-r--r--weed/command/queue.go107
-rw-r--r--weed/command/s3.go5
-rw-r--r--weed/command/scaffold.go2
-rw-r--r--weed/command/volume.go3
-rw-r--r--weed/command/webdav.go3
-rw-r--r--weed/filesys/wfs.go4
-rw-r--r--weed/operation/grpc_client.go8
-rw-r--r--weed/pb/grpc_client_server.go (renamed from weed/util/grpc_client_server.go)55
-rw-r--r--weed/replication/sink/filersink/fetch_write.go4
-rw-r--r--weed/replication/source/filer_source.go3
-rw-r--r--weed/s3api/s3api_handlers.go4
-rw-r--r--weed/server/msg_broker_grpc_server.go23
-rw-r--r--weed/server/msg_broker_server.go121
-rw-r--r--weed/server/queue_server.go49
-rw-r--r--weed/server/raft_server.go11
-rw-r--r--weed/server/volume_grpc_client_to_master.go5
-rw-r--r--weed/server/webdav_server.go3
-rw-r--r--weed/shell/command_fs_du.go9
-rw-r--r--weed/wdclient/masterclient.go21
26 files changed, 361 insertions, 246 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 6687469f1..9dc51e922 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -20,7 +20,7 @@ var Commands = []*Command{
cmdS3,
cmdUpload,
cmdDownload,
- cmdQueue,
+ cmdMsgBroker,
cmdScaffold,
cmdShell,
cmdVersion,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 31e65acea..b5b595215 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -9,6 +9,7 @@ import (
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -144,7 +145,7 @@ func (fo *FilerOptions) startFiler() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 18f41048b..3e7ae1db2 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -17,6 +17,7 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -159,7 +160,7 @@ func runCopy(cmd *Command, args []string) bool {
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@@ -274,7 +275,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if task.fileSize > 0 {
// assign a volume
- err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -319,7 +320,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -375,7 +376,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
// assign a volume
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
- err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
@@ -447,7 +448,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return uploadError
}
- if err := withFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -496,12 +497,3 @@ func detectMimeType(f *os.File) string {
mimeType := http.DetectContentType(head[:n])
return mimeType
}
-
-func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return util.WithCachedGrpcClient(func(clientConn *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(clientConn)
- return fn(client)
- }, filerAddress, grpcDialOption)
-
-}
diff --git a/weed/command/master.go b/weed/command/master.go
index c4b11119b..1be60426f 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -129,7 +130,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 4bdb3415a..e73cbee10 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,11 +1,5 @@
package command
-import (
- "fmt"
- "strconv"
- "strings"
-)
-
type MountOptions struct {
filer *string
filerMountRootPath *string
@@ -69,18 +63,3 @@ var cmdMount = &Command{
`,
}
-func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
- }
-
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("filer port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
-}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index e8e3fb030..b195bf143 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -17,6 +17,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -135,16 +136,16 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
})
// parse filer grpc address
- filerGrpcAddress, err := parseFilerGrpcAddress(filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
if err != nil {
- glog.V(0).Infof("parseFilerGrpcAddress: %v", err)
+ glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
daemonize.SignalOutcome(err)
return true
}
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
new file mode 100644
index 000000000..0d69a9a66
--- /dev/null
+++ b/weed/command/msg_broker.go
@@ -0,0 +1,111 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ messageBrokerStandaloneOptions QueueOptions
+)
+
+type QueueOptions struct {
+ filer *string
+ port *int
+ tlsPrivateKey *string
+ tlsCertificate *string
+ defaultTtl *string
+}
+
+func init() {
+ cmdMsgBroker.Run = runMsgBroker // break init cycle
+ messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
+ messageBrokerStandaloneOptions.tlsPrivateKey = cmdMsgBroker.Flag.String("key.file", "", "path to the TLS private key file")
+ messageBrokerStandaloneOptions.tlsCertificate = cmdMsgBroker.Flag.String("cert.file", "", "path to the TLS certificate file")
+ messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+}
+
+var cmdMsgBroker = &Command{
+ UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
+ Short: "<WIP> start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMsgBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return messageBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
+
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ filerQueuesPath := "/queues"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerQueuesPath = resp.DirQueues
+ glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{
+ Filers: []string{*msgBrokerOpt.filer},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Port: *msgBrokerOpt.port,
+ })
+
+ // start grpc listener
+ grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}
diff --git a/weed/command/queue.go b/weed/command/queue.go
deleted file mode 100644
index d09d5d8b3..000000000
--- a/weed/command/queue.go
+++ /dev/null
@@ -1,107 +0,0 @@
-package command
-
-import (
- "context"
- "fmt"
- "strconv"
- "time"
-
- "google.golang.org/grpc/reflection"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- queueStandaloneOptions QueueOptions
-)
-
-type QueueOptions struct {
- filer *string
- port *int
- tlsPrivateKey *string
- tlsCertificate *string
- defaultTtl *string
-}
-
-func init() {
- cmdQueue.Run = runQueue // break init cycle
- queueStandaloneOptions.filer = cmdQueue.Flag.String("filer", "localhost:8888", "filer server address")
- queueStandaloneOptions.port = cmdQueue.Flag.Int("port", 17777, "queue server gRPC listen port")
- queueStandaloneOptions.tlsPrivateKey = cmdQueue.Flag.String("key.file", "", "path to the TLS private key file")
- queueStandaloneOptions.tlsCertificate = cmdQueue.Flag.String("cert.file", "", "path to the TLS certificate file")
- queueStandaloneOptions.defaultTtl = cmdQueue.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
-}
-
-var cmdQueue = &Command{
- UsageLine: "<WIP> queue [-port=17777] [-filer=<ip:port>]",
- Short: "start a queue gRPC server that is backed by a filer",
- Long: `start a queue gRPC server that is backed by a filer.
-
-`,
-}
-
-func runQueue(cmd *Command, args []string) bool {
-
- util.LoadConfiguration("security", false)
-
- return queueStandaloneOptions.startQueueServer()
-
-}
-
-func (queueopt *QueueOptions) startQueueServer() bool {
-
- filerGrpcAddress, err := parseFilerGrpcAddress(*queueopt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
-
- filerQueuesPath := "/queues"
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
- for {
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
- }
- filerQueuesPath = resp.DirQueues
- glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
- return nil
- })
- if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
- time.Sleep(time.Second)
- } else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *queueopt.filer, filerGrpcAddress)
- break
- }
- }
-
- qs, err := weed_server.NewQueueServer(&weed_server.QueueServerOption{
- Filers: []string{*queueopt.filer},
- DefaultReplication: "",
- MaxMB: 0,
- Port: *queueopt.port,
- })
-
- // start grpc listener
- grpcL, err := util.NewListener(":"+strconv.Itoa(*queueopt.port), 0)
- if err != nil {
- glog.Fatalf("failed to listen on grpc port %d: %v", *queueopt.port, err)
- }
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.queue"))
- queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
- reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
-
- return true
-
-}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 39d0c04fc..cd4018fbc 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -6,6 +6,7 @@ import (
"net/http"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -117,7 +118,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
@@ -128,7 +129,7 @@ func (s3opt *S3Options) startS3Server() bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
- err = withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index fc7f8636d..5b246b7c0 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -326,7 +326,7 @@ key = ""
cert = ""
key = ""
-[grpc.queue]
+[grpc.msg_broker]
cert = ""
key = ""
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 9d665d143..4773d8a55 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -13,6 +13,7 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util/httpdown"
@@ -234,7 +235,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
go func() {
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 4d5752247..ba88a17be 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -54,7 +55,7 @@ func runWebDav(cmd *Command, args []string) bool {
func (wo *WebDavOption) startWebDav() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
if err != nil {
glog.Fatal(err)
return false
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index aa530f6aa..83826fed5 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -14,8 +14,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
@@ -93,7 +93,7 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- err := util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 7eed66503..dccf85da4 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -8,9 +8,9 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
@@ -20,7 +20,7 @@ func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption,
return err
}
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, grpcAddress, grpcDialOption)
@@ -39,12 +39,12 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer)
if parseErr != nil {
return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
}
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress, grpcDialOption)
diff --git a/weed/util/grpc_client_server.go b/weed/pb/grpc_client_server.go
index d6a9ee3c3..4b5f9eff3 100644
--- a/weed/util/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -1,4 +1,4 @@
-package util
+package pb
import (
"context"
@@ -11,6 +11,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
var (
@@ -127,3 +130,53 @@ func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
}
+
+func WithMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
+
+ masterGrpcAddress, parseErr := ParseServerToGrpcAddress(master)
+ if parseErr != nil {
+ return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
+ }
+
+ return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := master_pb.NewSeaweedClient(grpcConnection)
+ return fn(client)
+ }, masterGrpcAddress, grpcDialOption)
+
+}
+
+func WithFilerClient(filer string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress, parseErr := ParseServerToGrpcAddress(filer)
+ if parseErr != nil {
+ return fmt.Errorf("failed to parse filer grpc %v: %v", filer, parseErr)
+ }
+
+ return WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, fn)
+
+}
+
+func WithGrpcFilerClient(filerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
+
+ return WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, grpcDialOption)
+
+}
+
+func ParseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
+ hostnameAndPort := strings.Split(filer, ":")
+ if len(hostnameAndPort) != 2 {
+ return "", fmt.Errorf("filer should have hostname:port format: %v", hostnameAndPort)
+ }
+
+ filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ if parseErr != nil {
+ return "", fmt.Errorf("filer port parse error: %v", parseErr)
+ }
+
+ filerGrpcPort := int(filerPort) + 10000
+
+ return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
+}
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 954e951c9..232b68fec 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -10,9 +10,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir string) (replicatedChunks []*filer_pb.FileChunk, err error) {
@@ -111,7 +111,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, dir string)
func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 11eb3afa1..90bcffdf0 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -9,6 +9,7 @@ import (
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -92,7 +93,7 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade
func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, fs.grpcDialOption)
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 81a260a63..d7212d5e3 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -12,8 +12,8 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
type mimeType string
@@ -40,7 +40,7 @@ func encodeResponse(response interface{}) []byte {
func (s3a *S3ApiServer) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go
new file mode 100644
index 000000000..8b13aac76
--- /dev/null
+++ b/weed/server/msg_broker_grpc_server.go
@@ -0,0 +1,23 @@
+package weed_server
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+)
+
+func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
+ panic("implement me")
+}
diff --git a/weed/server/msg_broker_server.go b/weed/server/msg_broker_server.go
new file mode 100644
index 000000000..a9d908581
--- /dev/null
+++ b/weed/server/msg_broker_server.go
@@ -0,0 +1,121 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessageBrokerOption struct {
+ Filers []string
+ DefaultReplication string
+ MaxMB int
+ Port int
+}
+
+type MessageBroker struct {
+ option *MessageBrokerOption
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) {
+
+ messageBroker = &MessageBroker{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"),
+ }
+
+ go messageBroker.loopForEver()
+
+ return messageBroker, nil
+}
+
+func (broker *MessageBroker) loopForEver() {
+
+ for {
+ broker.checkPeers()
+ time.Sleep(3 * time.Second)
+ }
+
+}
+
+func (broker *MessageBroker) checkPeers() {
+
+ // contact a filer about masters
+ var masters []string
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+ // contact each masters for filers
+ var filers []string
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to list filers: %v\n", err)
+ return
+ }
+ }
+
+ // contact each filer about brokers
+ for _, filer := range filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+}
+
+func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+
+}
+
+func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return fn(client)
+ })
+
+}
diff --git a/weed/server/queue_server.go b/weed/server/queue_server.go
deleted file mode 100644
index 078c76a30..000000000
--- a/weed/server/queue_server.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package weed_server
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type QueueServerOption struct {
- Filers []string
- DefaultReplication string
- MaxMB int
- Port int
-}
-
-type QueueServer struct {
- option *QueueServerOption
- grpcDialOption grpc.DialOption
-}
-
-func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
- panic("implement me")
-}
-
-func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
- panic("implement me")
-}
-
-func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
- panic("implement me")
-}
-
-func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
- panic("implement me")
-}
-
-func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) {
-
- qs = &QueueServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"),
- }
-
- return qs, nil
-}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 53289f1c1..0381c7feb 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,8 +2,6 @@ package weed_server
import (
"encoding/json"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
"io/ioutil"
"os"
"path"
@@ -11,7 +9,12 @@ import (
"sort"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -61,7 +64,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
s.raftServer.Start()
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer))
+ s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
@@ -72,7 +75,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: util.ServerToGrpcAddress(s.serverAddr),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
})
if err != nil {
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 2168afee7..1f4d9df10 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -36,7 +37,7 @@ func (vs *VolumeServer) heartbeat() {
if newLeader != "" {
master = newLeader
}
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
if parseErr != nil {
glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
continue
@@ -55,7 +56,7 @@ func (vs *VolumeServer) heartbeat() {
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+ grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index ddd611724..a07f6be01 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -14,6 +14,7 @@ import (
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -98,7 +99,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index 6c31ebdff..ca2f22b57 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -4,11 +4,9 @@ import (
"fmt"
"io"
- "google.golang.org/grpc"
-
"github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -82,10 +80,7 @@ func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir,
func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, filerGrpcAddress, env.option.GrpcDialOption)
+ return pb.WithGrpcFilerClient(filerGrpcAddress, env.option.GrpcDialOption, fn)
}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 0cf161a63..301f20615 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -2,15 +2,14 @@ package wdclient
import (
"context"
- "fmt"
"math/rand"
"time"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
type MasterClient struct {
@@ -67,7 +66,7 @@ func (mc *MasterClient) tryAllMasters() {
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
glog.V(1).Infof("%s Connecting to master %v", mc.name, master)
- gprcErr := withMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(context.Background())
if err != nil {
@@ -119,22 +118,8 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
return
}
-func withMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
-
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
- if parseErr != nil {
- return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
- }
-
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := master_pb.NewSeaweedClient(grpcConnection)
- return fn(client)
- }, masterGrpcAddress, grpcDialOption)
-
-}
-
func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
- return withMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})
}