aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-06-05 23:37:41 -0700
committerChris Lu <chris.lu@gmail.com>2018-06-05 23:37:41 -0700
commit299312c8057c5b96f67a8ac825ee026fe01dd8fc (patch)
treeec17f807d12adcb3fb4b8b35663258cfbeaeeb3f
parent95fe745a0cf98975172e5e8fc166647c48f52c80 (diff)
downloadseaweedfs-299312c8057c5b96f67a8ac825ee026fe01dd8fc.tar.xz
seaweedfs-299312c8057c5b96f67a8ac825ee026fe01dd8fc.zip
use separate filer grpc port
-rw-r--r--weed/command/filer.go24
-rw-r--r--weed/command/filer_copy.go60
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go23
-rw-r--r--weed/filesys/dirty_page.go2
-rw-r--r--weed/filesys/file.go6
-rw-r--r--weed/filesys/wfs.go10
7 files changed, 86 insertions, 41 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 1bd1493bd..2d4696828 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -10,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/soheilhy/cmux"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"strings"
@@ -24,6 +23,7 @@ type FilerOptions struct {
masters *string
ip *string
port *int
+ grpcPort *int
publicPort *int
collection *string
defaultReplicaPlacement *string
@@ -39,6 +39,7 @@ func init() {
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
+ f.grpcPort = cmdFiler.Flag.Int("port.grpc", 0, "filer grpc server listen port, default to http port + 10000")
f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
@@ -119,21 +120,22 @@ func (fo *FilerOptions) start() {
glog.Fatalf("Filer listener error: %v", e)
}
- m := cmux.New(filerListener)
- grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
- httpL := m.Match(cmux.Any())
-
- // Create your protocol servers.
+ // starting grpc server
+ grpcPort := *f.grpcPort
+ if grpcPort == 0 {
+ grpcPort = *f.port + 10000
+ }
+ grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
+ }
grpcS := grpc.NewServer()
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
-
- httpS := &http.Server{Handler: defaultMux}
-
go grpcS.Serve(grpcL)
- go httpS.Serve(httpL)
- if err := m.Serve(); err != nil {
+ httpS := &http.Server{Handler: defaultMux}
+ if err := httpS.Serve(filerListener); err != nil {
glog.Fatalf("Filer Fail to serve: %v", e)
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 904aac76c..6bc3d4119 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -25,13 +25,14 @@ var (
)
type CopyOptions struct {
- master *string
- include *string
- replication *string
- collection *string
- ttl *string
- maxMB *int
- secretKey *string
+ filerGrpcPort *int
+ master *string
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ secretKey *string
secret security.Secret
}
@@ -45,6 +46,7 @@ func init() {
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
+ copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000")
copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
}
@@ -87,15 +89,33 @@ func runCopy(cmd *Command, args []string) bool {
urlPath = urlPath + "/"
}
+ if filerUrl.Port() == "" {
+ fmt.Printf("The filer port should be specified.\n")
+ return false
+ }
+
+ filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
+ if parseErr != nil {
+ fmt.Printf("The filer port parse error: %v\n", parseErr)
+ return false
+ }
+
+ filerGrpcPort := filerPort + 10000
+ if *copy.filerGrpcPort != 0 {
+ filerGrpcPort = uint64(*copy.filerGrpcPort)
+ }
+
+ filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+
for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) {
+ if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
return false
}
}
return true
}
-func doEachCopy(fileOrDir string, host string, path string) bool {
+func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
f, err := os.Open(fileOrDir)
if err != nil {
fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
@@ -113,7 +133,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
- if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") {
+ if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
return false
}
}
@@ -135,13 +155,13 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
}
if chunkCount == 1 {
- return uploadFileAsOne(host, path, f, fi)
+ return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
}
- return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize)
+ return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
}
-func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool {
+func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
// upload the file content
fileName := filepath.Base(f.Name())
@@ -183,10 +203,10 @@ func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileIn
Mtime: time.Now().UnixNano(),
})
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
}
- if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
+ if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: urlFolder,
Entry: &filer_pb.Entry{
@@ -209,14 +229,14 @@ func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileIn
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
+ fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
return false
}
return true
}
-func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
@@ -259,7 +279,7 @@ func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.Fil
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}
- if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
+ if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: urlFolder,
Entry: &filer_pb.Entry{
@@ -282,11 +302,11 @@ func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.Fil
}
return nil
}); err != nil {
- fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
+ fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err)
return false
}
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
return true
}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 6ba3b3697..df215674f 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -2,6 +2,7 @@ package command
type MountOptions struct {
filer *string
+ filerGrpcPort *int
dir *string
collection *string
replication *string
@@ -15,6 +16,7 @@ var (
func init() {
cmdMount.Run = runMount // break init cycle
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
+ mountOptions.filerGrpcPort = cmdMount.Flag.Int("filer.grpc.port", 0, "filer grpc server listen port, default to http port + 10000")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "000", "replication to create to files")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index d8b6884ff..f64dccb54 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -11,6 +11,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
+ "strconv"
)
func runMount(cmd *Command, args []string) bool {
@@ -51,8 +53,27 @@ func runMount(cmd *Command, args []string) bool {
c.Close()
})
+ hostnameAndPort := strings.Split(*mountOptions.filer, ":")
+ if len(hostnameAndPort) != 2 {
+ fmt.Printf("The filer should have hostname:port format: %v\n", hostnameAndPort)
+ return false
+ }
+
+ filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
+ if parseErr != nil {
+ fmt.Printf("The filer filer port parse error: %v\n", parseErr)
+ return false
+ }
+
+ filerGrpcPort := filerPort + 10000
+ if *mountOptions.filerGrpcPort != 0 {
+ filerGrpcPort = uint64(*copy.filerGrpcPort)
+ }
+
+ filerAddress := fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort)
+
err = fs.Serve(c, filesys.NewSeaweedFileSystem(
- *mountOptions.filer, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB))
+ filerAddress, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB))
if err != nil {
fuse.Unmount(*mountOptions.dir)
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 996eb0abb..ca8c29b7a 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -133,7 +133,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
return nil
}); err != nil {
- return nil, fmt.Errorf("filer assign volume: %v", err)
+ return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 1fb7d53b1..625fd4f74 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -26,7 +26,7 @@ type File struct {
isOpen bool
}
-func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
+func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
fullPath := filepath.Join(file.dir.Path, file.Name)
@@ -45,7 +45,7 @@ func (file *File) Attr(context context.Context, attr *fuse.Attr) error {
ParentDir: file.dir.Path,
}
- resp, err := client.GetEntryAttributes(context, request)
+ resp, err := client.GetEntryAttributes(ctx, request)
if err != nil {
glog.V(0).Infof("file attr read file %v: %v", request, err)
return err
@@ -129,7 +129,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
- // write the file chunks to the filer
+ // write the file chunks to the filerGrpcAddress
glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
return nil
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 4b9e20b95..ac7333695 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -9,16 +9,16 @@ import (
)
type WFS struct {
- filer string
+ filerGrpcAddress string
listDirectoryEntriesCache *ccache.Cache
collection string
replication string
chunkSizeLimit int64
}
-func NewSeaweedFileSystem(filer string, collection string, replication string, chunkSizeLimitMB int) *WFS {
+func NewSeaweedFileSystem(filerGrpcAddress string, collection string, replication string, chunkSizeLimitMB int) *WFS {
return &WFS{
- filer: filer,
+ filerGrpcAddress: filerGrpcAddress,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)),
collection: collection,
replication: replication,
@@ -32,9 +32,9 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := grpc.Dial(wfs.filer, grpc.WithInsecure())
+ grpcConnection, err := grpc.Dial(wfs.filerGrpcAddress, grpc.WithInsecure())
if err != nil {
- return fmt.Errorf("fail to dial %s: %v", wfs.filer, err)
+ return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err)
}
defer grpcConnection.Close()