aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/repeated_vacuum/repeated_vacuum.go8
-rw-r--r--weed/command/backup.go11
-rw-r--r--weed/command/benchmark.go14
-rw-r--r--weed/command/filer.go6
-rw-r--r--weed/command/filer_copy.go49
-rw-r--r--weed/command/filer_replication.go1
-rw-r--r--weed/command/master.go7
-rw-r--r--weed/command/mount_std.go7
-rw-r--r--weed/command/s3.go6
-rw-r--r--weed/command/scaffold.go26
-rw-r--r--weed/command/server.go8
-rw-r--r--weed/command/upload.go11
-rw-r--r--weed/command/volume.go7
-rw-r--r--weed/filer2/filer.go7
-rw-r--r--weed/filer2/filer_deletion.go4
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go4
-rw-r--r--weed/filer2/memdb/memdb_store_test.go4
-rw-r--r--weed/filesys/filehandle.go5
-rw-r--r--weed/filesys/wfs.go3
-rw-r--r--weed/filesys/wfs_deletion.go2
-rw-r--r--weed/operation/assign_file_id.go5
-rw-r--r--weed/operation/chunked_file.go5
-rw-r--r--weed/operation/delete_content.go15
-rw-r--r--weed/operation/grpc_client.go8
-rw-r--r--weed/operation/lookup.go5
-rw-r--r--weed/operation/stats.go5
-rw-r--r--weed/operation/submit.go19
-rw-r--r--weed/operation/sync_volume.go9
-rw-r--r--weed/replication/sink/filersink/fetch_write.go2
-rw-r--r--weed/replication/sink/filersink/filer_sink.go19
-rw-r--r--weed/replication/source/filer_source.go15
-rw-r--r--weed/s3api/s3api_handlers.go2
-rw-r--r--weed/s3api/s3api_server.go2
-rw-r--r--weed/security/tls.go66
-rw-r--r--weed/server/common.go5
-rw-r--r--weed/server/filer_grpc_server.go4
-rw-r--r--weed/server/filer_server.go13
-rw-r--r--weed/server/filer_server_handlers_write.go2
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go7
-rw-r--r--weed/server/master_server_handlers.go2
-rw-r--r--weed/server/master_server_handlers_admin.go10
-rw-r--r--weed/server/volume_grpc_client_to_master.go11
-rw-r--r--weed/server/volume_server.go18
-rw-r--r--weed/server/volume_server_handlers_write.go2
-rw-r--r--weed/storage/volume_sync.go21
-rw-r--r--weed/topology/allocate_volume.go5
-rw-r--r--weed/topology/topology_event_handling.go5
-rw-r--r--weed/topology/topology_vacuum.go31
-rw-r--r--weed/topology/volume_growth.go17
-rw-r--r--weed/util/grpc_client_server.go4
-rw-r--r--weed/wdclient/masterclient.go29
-rw-r--r--weed/wdclient/wdclient.go15
53 files changed, 382 insertions, 188 deletions
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 95201f3e8..90bdeb5e8 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -4,6 +4,9 @@ import (
"bytes"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
"log"
"math/rand"
@@ -19,8 +22,11 @@ var (
func main() {
flag.Parse()
+ weed_server.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+
for i := 0; i < *repeat; i++ {
- assignResult, err := operation.Assign(*master, &operation.VolumeAssignRequest{Count: 1})
+ assignResult, err := operation.Assign(*master, grpcDialOption, &operation.VolumeAssignRequest{Count: 1})
if err != nil {
log.Fatalf("assign: %v", err)
}
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 0641f2e5d..86391f9c4 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -2,6 +2,9 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -46,6 +49,10 @@ var cmdBackup = &Command{
}
func runBackup(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+
if *s.volumeId == -1 {
return false
}
@@ -59,7 +66,7 @@ func runBackup(cmd *Command, args []string) bool {
}
volumeServer := lookup.Locations[0].Url
- stats, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid))
+ stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil {
fmt.Printf("Error get volume %d status: %v\n", vid, err)
return true
@@ -81,7 +88,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
- if err := v.Synchronize(volumeServer); err != nil {
+ if err := v.Synchronize(volumeServer, grpcDialOption); err != nil {
fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
return true
}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 93359b243..44601e567 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -4,6 +4,9 @@ import (
"bufio"
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"io"
"math"
"math/rand"
@@ -35,6 +38,7 @@ type BenchmarkOptions struct {
collection *string
cpuprofile *string
maxCpu *int
+ grpcDialOption grpc.DialOption
}
var (
@@ -101,6 +105,10 @@ var (
)
func runBenchmark(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+ b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
@@ -115,7 +123,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ","))
+ masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
go masterClient.KeepConnectedToMaster()
masterClient.WaitUntilConnected()
@@ -223,12 +231,12 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Count: 1,
Collection: *b.collection,
}
- if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil {
+ if assignResult, err := operation.Assign(masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, masterClient.GetMaster(), assignResult.Auth); err == nil {
+ if _, err := fp.Upload(0, masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a07a67471..478b7d6bf 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -1,6 +1,8 @@
package command
import (
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
"net/http"
"strconv"
"strings"
@@ -75,6 +77,8 @@ var cmdFiler = &Command{
func runFiler(cmd *Command, args []string) bool {
+ weed_server.LoadConfiguration("security", false)
+
f.startFiler()
return true
@@ -141,7 +145,7 @@ func (fo *FilerOptions) startFiler() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer()
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 39d83c31e..650757442 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -2,6 +2,10 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"io/ioutil"
"net/url"
"os"
@@ -23,13 +27,14 @@ var (
)
type CopyOptions struct {
- filerGrpcPort *int
- master *string
- include *string
- replication *string
- collection *string
- ttl *string
- maxMB *int
+ filerGrpcPort *int
+ master *string
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ grpcDialOption grpc.DialOption
}
func init() {
@@ -61,6 +66,9 @@ var cmdCopy = &Command{
}
func runCopy(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+
if len(args) <= 1 {
return false
}
@@ -95,16 +103,17 @@ func runCopy(cmd *Command, args []string) bool {
}
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+ copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) {
+ if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) {
return false
}
}
return true
}
-func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool {
+func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool {
f, err := os.Open(fileOrDir)
if err != nil {
fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
@@ -122,7 +131,7 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path st
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
- if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") {
+ if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") {
return false
}
}
@@ -144,13 +153,13 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path st
}
if chunkCount == 1 {
- return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi)
+ return uploadFileAsOne(filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi)
}
- return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize)
+ return uploadFileInChunks(filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize)
}
-func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool {
+func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool {
// upload the file content
fileName := filepath.Base(f.Name())
@@ -161,7 +170,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f
if fi.Size() > 0 {
// assign a volume
- assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
+ assignResult, err := operation.Assign(*copy.master, grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: *copy.replication,
Collection: *copy.collection,
@@ -195,7 +204,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f
fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName)
}
- if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
+ if err := withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: urlFolder,
Entry: &filer_pb.Entry{
@@ -228,7 +237,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f
return true
}
-func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
@@ -238,7 +247,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string,
for i := int64(0); i < int64(chunkCount); i++ {
// assign a volume
- assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
+ assignResult, err := operation.Assign(*copy.master, grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: *copy.replication,
Collection: *copy.collection,
@@ -272,7 +281,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string,
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}
- if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error {
+ if err := withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: urlFolder,
Entry: &filer_pb.Entry{
@@ -323,9 +332,9 @@ func detectMimeType(f *os.File) string {
return mimeType
}
-func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
+func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(filerAddress)
+ grpcConnection, err := util.GrpcDial(filerAddress, grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index c24f63bf0..c9afbdc8a 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -35,6 +35,7 @@ var cmdFilerReplicate = &Command{
func runFilerReplicate(cmd *Command, args []string) bool {
+ weed_server.LoadConfiguration("security", false)
weed_server.LoadConfiguration("replication", true)
weed_server.LoadConfiguration("notification", true)
config := viper.GetViper()
diff --git a/weed/command/master.go b/weed/command/master.go
index 6f1373aa2..5b45c9627 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,6 +1,8 @@
package command
import (
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
"net/http"
"os"
"runtime"
@@ -54,6 +56,9 @@ var (
)
func runMaster(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+
if *mMaxCpu < 1 {
*mMaxCpu = runtime.NumCPU()
}
@@ -104,7 +109,7 @@ func runMaster(cmd *Command, args []string) bool {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer()
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 2937b9ef1..3e4249bfc 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -4,6 +4,9 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
"os"
"os/user"
"runtime"
@@ -19,6 +22,9 @@ import (
)
func runMount(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *mountOptions.dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
@@ -91,6 +97,7 @@ func runMount(cmd *Command, args []string) bool {
err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{
FilerGrpcAddress: filerGrpcAddress,
+ GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
FilerMountRootPath: mountRoot,
Collection: *mountOptions.collection,
Replication: *mountOptions.replication,
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 16a9490ff..a54ddd2f7 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -1,6 +1,9 @@
package command
import (
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
"net/http"
"time"
@@ -46,6 +49,8 @@ var cmdS3 = &Command{
func runS3(cmd *Command, args []string) bool {
+ weed_server.LoadConfiguration("security", false)
+
filerGrpcAddress, err := parseFilerGrpcAddress(*s3options.filer, *s3options.filerGrpcPort)
if err != nil {
glog.Fatal(err)
@@ -59,6 +64,7 @@ func runS3(cmd *Command, args []string) bool {
FilerGrpcAddress: filerGrpcAddress,
DomainName: *s3options.domainName,
BucketsPath: *s3options.filerBucketsPath,
+ GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 22300d3ba..e8608e9dd 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -250,8 +250,34 @@ directory = "/" # destination directory
# /etc/seaweedfs/security.toml
# this file is read by master, volume server, and filer
+# the jwt signing key is read by master and volume server
+# a jwt expires in 10 seconds
[jwt.signing]
key = ""
+# volume server also uses grpc that should be secured.
+
+# all grpc tls authentications are mutual
+[grpc]
+ca = ""
+
+[grpc.volume]
+cert = ""
+key = ""
+
+[grpc.master]
+cert = ""
+key = ""
+
+[grpc.filer]
+cert = ""
+key = ""
+
+# use this for any place needs a grpc client
+# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
+[grpc.client]
+cert = ""
+key = ""
+
`
)
diff --git a/weed/command/server.go b/weed/command/server.go
index 2dd506772..a9415d068 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -1,6 +1,8 @@
package command
import (
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
"net/http"
"os"
"runtime"
@@ -95,6 +97,9 @@ func init() {
}
func runServer(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+
if *serverOptions.cpuprofile != "" {
f, err := os.Create(*serverOptions.cpuprofile)
if err != nil {
@@ -188,7 +193,8 @@ func runServer(cmd *Command, args []string) bool {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer()
+ glog.V(0).Infof("grpc config %+v", viper.Sub("grpc"))
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
reflection.Register(grpcS)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index df2cb9892..80fc635c1 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -3,6 +3,9 @@ package command
import (
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
"os"
"path/filepath"
@@ -57,6 +60,10 @@ var cmdUpload = &Command{
}
func runUpload(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+
if len(args) == 0 {
if *upload.dir == "" {
return false
@@ -73,7 +80,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*upload.master, parts,
+ results, e := operation.SubmitFiles(*upload.master, grpcDialOption, parts,
*upload.replication, *upload.collection, *upload.dataCenter,
*upload.ttl, *upload.maxMB)
bytes, _ := json.Marshal(results)
@@ -92,7 +99,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*upload.master, parts,
+ results, _ := operation.SubmitFiles(*upload.master, grpcDialOption, parts,
*upload.replication, *upload.collection, *upload.dataCenter,
*upload.ttl, *upload.maxMB)
bytes, _ := json.Marshal(results)
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 27a075b5b..32ec7819b 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,6 +1,8 @@
package command
import (
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
"net/http"
"os"
"runtime"
@@ -78,6 +80,9 @@ var (
)
func runVolume(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("security", false)
+
if *v.maxCpu < 1 {
*v.maxCpu = runtime.NumCPU()
}
@@ -185,7 +190,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer()
+ grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume"))
volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 1ee2f5ede..672c6201c 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -3,6 +3,7 @@ package filer2
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"math"
"os"
"path/filepath"
@@ -24,13 +25,15 @@ type Filer struct {
directoryCache *ccache.Cache
MasterClient *wdclient.MasterClient
fileIdDeletionChan chan string
+ GrpcDialOption grpc.DialOption
}
-func NewFiler(masters []string) *Filer {
+func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
f := &Filer{
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
+ MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
fileIdDeletionChan: make(chan string, 4096),
+ GrpcDialOption: grpcDialOption,
}
go f.loopProcessingDeletion()
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 8fe8ae04f..c7d02657d 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -38,13 +38,13 @@ func (f *Filer) loopProcessingDeletion() {
fileIds = append(fileIds, fid)
if len(fileIds) >= 4096 {
glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
fileIds = fileIds[:0]
}
case <-ticker.C:
if len(fileIds) > 0 {
glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
fileIds = fileIds[:0]
}
}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 5b214558f..4d600e0bf 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -8,7 +8,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
@@ -61,7 +61,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go
index cf813e04b..31da8998f 100644
--- a/weed/filer2/memdb/memdb_store_test.go
+++ b/weed/filer2/memdb/memdb_store_test.go
@@ -6,7 +6,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
store := &MemDbStore{}
store.Initialize(nil)
filer.SetStore(store)
@@ -43,7 +43,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestCreateFileAndList(t *testing.T) {
- filer := filer2.NewFiler(nil)
+ filer := filer2.NewFiler(nil, nil)
store := &MemDbStore{}
store.Initialize(nil)
filer.SetStore(store)
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 0f6ca1164..3bca0e22e 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+ "google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -230,7 +231,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
})
}
-func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error {
+func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
var vids []string
for _, fileId := range fileIds {
@@ -267,7 +268,7 @@ func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, file
return m, err
}
- _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
return err
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 6778d7b31..f7383582d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -19,6 +19,7 @@ import (
type Option struct {
FilerGrpcAddress string
+ GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
@@ -77,7 +78,7 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, wfs.option.FilerGrpcAddress)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index b96b27ca6..90058d75a 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -16,7 +16,7 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
}
wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(context.Background(), client, fileIds)
+ deleteFileIds(context.Background(), wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index acadc88c8..7e7a9059d 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"strings"
"time"
@@ -30,7 +31,7 @@ type AssignResult struct {
Auth security.EncodedJwt `json:"auth,omitempty"`
}
-func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -44,7 +45,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
continue
}
- lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ lastError = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 9d8267dee..f3f6e7b00 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"io"
"net/http"
"sort"
@@ -69,12 +70,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string) error {
+func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, fileIds)
+ results, err := DeleteFiles(master, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 57fc0329e..1df95211e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -28,17 +29,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
- return LookupVolumeIds(master, vids)
+ return LookupVolumeIds(master, grpcDialOption, vids)
}
- return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
}
-func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
@@ -92,7 +93,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
go func(server string, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil {
+ if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
err = deleteErr
} else {
ret = append(ret, deleteResults...)
@@ -106,9 +107,9 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index d0931a8d3..a02844657 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -18,7 +18,7 @@ var (
grpcClientsLock sync.Mutex
)
-func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
+func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
@@ -28,7 +28,7 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, grpcAddress)
+ }, grpcAddress, grpcDialOption)
}
@@ -42,7 +42,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
+func withMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0)
if parseErr != nil {
@@ -52,6 +52,6 @@ func withMasterServerClient(masterServer string, fn func(masterClient master_pb.
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress)
+ }, masterGrpcAddress, grpcDialOption)
}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 562a11580..c4040f3e7 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"math/rand"
"net/url"
"strings"
@@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
+func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -98,7 +99,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
//only query unknown_vids
- err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ err := withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
index 364727272..9f7166864 100644
--- a/weed/operation/stats.go
+++ b/weed/operation/stats.go
@@ -2,14 +2,15 @@ package operation
import (
"context"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
-func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
+func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
- err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ err = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 374927495..bdf59d966 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -36,7 +37,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart,
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
@@ -49,7 +50,7 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
- ret, err := Assign(master, ar)
+ ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -65,7 +66,7 @@ func SubmitFiles(master string, files []FilePart,
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, ret.Auth)
+ results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -108,7 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -136,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -149,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return
}
id = ret.Fid
@@ -170,7 +171,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -185,7 +186,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index e40c7de41..bf81415c9 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"io"
"time"
@@ -11,9 +12,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
+func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -26,9 +27,9 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
return
}
-func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
+func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
- return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index e632164a4..f1306ca4c 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -105,7 +105,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId stri
func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(fs.grpcAddress)
+ grpcConnection, err := util.GrpcDial(fs.grpcAddress, fs.grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 2e9cc86d1..2eb326b83 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -3,6 +3,9 @@ package filersink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -13,13 +16,14 @@ import (
)
type FilerSink struct {
- filerSource *source.FilerSource
- grpcAddress string
- dir string
- replication string
- collection string
- ttlSec int32
- dataCenter string
+ filerSource *source.FilerSource
+ grpcAddress string
+ dir string
+ replication string
+ collection string
+ ttlSec int32
+ dataCenter string
+ grpcDialOption grpc.DialOption
}
func init() {
@@ -55,6 +59,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
fs.replication = replication
fs.collection = collection
fs.ttlSec = int32(ttlSec)
+ fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
return nil
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index efe71e706..92c2d203d 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -3,6 +3,9 @@ package source
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"io"
"net/http"
"strings"
@@ -17,8 +20,9 @@ type ReplicationSource interface {
}
type FilerSource struct {
- grpcAddress string
- Dir string
+ grpcAddress string
+ grpcDialOption grpc.DialOption
+ Dir string
}
func (fs *FilerSource) Initialize(configuration util.Configuration) error {
@@ -31,6 +35,7 @@ func (fs *FilerSource) Initialize(configuration util.Configuration) error {
func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) {
fs.grpcAddress = grpcAddress
fs.Dir = dir
+ fs.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
return nil
}
@@ -40,7 +45,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
vid := volumeId(part)
- err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.withFilerClient(fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
glog.V(4).Infof("read lookup volume id locations: %v", vid)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
@@ -84,9 +89,9 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade
return filename, header, readCloser, err
}
-func (fs *FilerSource) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *FilerSource) withFilerClient(grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(fs.grpcAddress)
+ grpcConnection, err := util.GrpcDial(fs.grpcAddress, grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err)
}
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 286398310..5d92085cc 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -37,7 +37,7 @@ func encodeResponse(response interface{}) []byte {
func (s3a *S3ApiServer) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(s3a.option.FilerGrpcAddress)
+ grpcConnection, err := util.GrpcDial(s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", s3a.option.FilerGrpcAddress, err)
}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index db798a546..24458592d 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -8,6 +8,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/gorilla/mux"
+ "google.golang.org/grpc"
"net/http"
)
@@ -16,6 +17,7 @@ type S3ApiServerOption struct {
FilerGrpcAddress string
DomainName string
BucketsPath string
+ GrpcDialOption grpc.DialOption
}
type S3ApiServer struct {
diff --git a/weed/security/tls.go b/weed/security/tls.go
new file mode 100644
index 000000000..e81ba4831
--- /dev/null
+++ b/weed/security/tls.go
@@ -0,0 +1,66 @@
+package security
+
+import (
+ "crypto/tls"
+ "crypto/x509"
+ "github.com/spf13/viper"
+ "io/ioutil"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+)
+
+func LoadServerTLS(config *viper.Viper, component string) grpc.ServerOption {
+ if config == nil {
+ return nil
+ }
+
+ // load cert/key, ca cert
+ cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key"))
+ if err != nil {
+ glog.Errorf("load cert/key error: %v", err)
+ return nil
+ }
+ caCert, err := ioutil.ReadFile(config.GetString("ca"))
+ if err != nil {
+ glog.Errorf("read ca cert file error: %v", err)
+ return nil
+ }
+ caCertPool := x509.NewCertPool()
+ caCertPool.AppendCertsFromPEM(caCert)
+ ta := credentials.NewTLS(&tls.Config{
+ Certificates: []tls.Certificate{cert},
+ ClientCAs: caCertPool,
+ ClientAuth: tls.RequireAndVerifyClientCert,
+ })
+
+ return grpc.Creds(ta)
+}
+
+func LoadClientTLS(config *viper.Viper, component string) grpc.DialOption {
+ if config == nil {
+ return grpc.WithInsecure()
+ }
+
+ // load cert/key, cacert
+ cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key"))
+ if err != nil {
+ glog.Errorf("load cert/key error: %v", err)
+ return grpc.WithInsecure()
+ }
+ caCert, err := ioutil.ReadFile(config.GetString("ca"))
+ if err != nil {
+ glog.Errorf("read ca cert file error: %v", err)
+ return grpc.WithInsecure()
+ }
+ caCertPool := x509.NewCertPool()
+ caCertPool.AppendCertsFromPEM(caCert)
+
+ ta := credentials.NewTLS(&tls.Config{
+ Certificates: []tls.Certificate{cert},
+ RootCAs: caCertPool,
+ InsecureSkipVerify: true,
+ })
+ return grpc.WithTransportCredentials(ta)
+}
diff --git a/weed/server/common.go b/weed/server/common.go
index c9f17aa86..1c75d44cf 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"net/http"
"path/filepath"
"strconv"
@@ -81,7 +82,7 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params...)
}
-func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
+func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -111,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
}
- assignResult, ae := operation.Assign(masterUrl, ar)
+ assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 9a83ee1a6..4f1377331 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -220,7 +220,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
DataCenter: "",
}
}
- assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest)
+ assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
return nil, fmt.Errorf("assign volume: %v", err)
}
@@ -254,7 +254,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Ttl: req.Ttl,
}
- output, err := operation.Statistics(fs.filer.GetMaster(), input)
+ output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input)
if err != nil {
return nil, err
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index c3c5072d0..2ace0a7ea 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "google.golang.org/grpc"
"net/http"
"os"
@@ -34,22 +35,24 @@ type FilerOption struct {
}
type FilerServer struct {
- option *FilerOption
- secret security.SigningKey
- filer *filer2.Filer
+ option *FilerOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
+ option: option,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
}
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
go fs.filer.KeepConnectedToMaster()
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 7cdbddde2..9e231c645 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -51,7 +51,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
}
}
- assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae)
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 6e9cd512d..13f8b37d1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -76,7 +76,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 06c959b92..a44a567d6 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
@@ -37,6 +38,8 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
+
+ grpcDialOpiton grpc.DialOption
}
func NewMasterServer(r *mux.Router, port int, metaFolder string,
@@ -48,7 +51,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
whiteList []string,
) *MasterServer {
- LoadConfiguration("security", false)
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -64,6 +66,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
@@ -89,7 +92,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
- ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate)
return ms
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index c4149e0cf..5bdb448c1 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -93,7 +93,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 3a2662908..eccf3ee4c 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -24,7 +24,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -60,7 +60,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(gcThreshold, ms.preallocate)
+ ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate)
ms.dirStatusHandler(w, r)
}
@@ -76,7 +76,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
} else {
- count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
+ count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -126,13 +126,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, ms.selfUrl(r))
+ submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, masterUrl)
+ submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton)
}
}
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 25e9b1677..38603e4b6 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,6 +2,9 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -19,6 +22,8 @@ func (vs *VolumeServer) heartbeat() {
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume")
+
var err error
var newLeader string
for {
@@ -31,7 +36,7 @@ func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress)
continue
}
- newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second)
+ newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
@@ -40,9 +45,9 @@ func (vs *VolumeServer) heartbeat() {
}
}
-func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(masterGrpcAddress)
+ grpcConection, err := util.GrpcDial(masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index d8ff01766..8e77ec570 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "google.golang.org/grpc"
"net/http"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,13 +11,14 @@ import (
)
type VolumeServer struct {
- MasterNodes []string
- currentMaster string
- pulseSeconds int
- dataCenter string
- rack string
- store *storage.Store
- guard *security.Guard
+ MasterNodes []string
+ currentMaster string
+ pulseSeconds int
+ dataCenter string
+ rack string
+ store *storage.Store
+ guard *security.Guard
+ grpcDialOption grpc.DialOption
needleMapKind storage.NeedleMapType
FixJpgOrientation bool
@@ -33,7 +35,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fixJpgOrientation bool,
readRedirect bool) *VolumeServer {
- LoadConfiguration("security", false)
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
enableUiAccess := v.GetBool("access.ui")
@@ -45,6 +46,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
}
vs.MasterNodes = masterNodes
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 1cfd9187e..6b78cea40 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -95,7 +95,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
// make sure all chunks had deleted before delete manifest
- if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil {
+ if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}
diff --git a/weed/storage/volume_sync.go b/weed/storage/volume_sync.go
index 137a9b4ca..8d90a729d 100644
--- a/weed/storage/volume_sync.go
+++ b/weed/storage/volume_sync.go
@@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"io"
"os"
"sort"
@@ -45,12 +46,12 @@ optimized more later).
*/
-func (v *Volume) Synchronize(volumeServer string) (err error) {
+func (v *Volume) Synchronize(volumeServer string, grpcDialOption grpc.DialOption) (err error) {
var lastCompactRevision uint16 = 0
var compactRevision uint16 = 0
var masterMap *needle.CompactMap
for i := 0; i < 3; i++ {
- if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, v.Id); err != nil {
+ if masterMap, _, compactRevision, err = fetchVolumeFileEntries(volumeServer, grpcDialOption, v.Id); err != nil {
return fmt.Errorf("Failed to sync volume %d entries with %s: %v", v.Id, volumeServer, err)
}
if lastCompactRevision != compactRevision && lastCompactRevision != 0 {
@@ -62,7 +63,7 @@ func (v *Volume) Synchronize(volumeServer string) (err error) {
}
}
lastCompactRevision = compactRevision
- if err = v.trySynchronizing(volumeServer, masterMap, compactRevision); err == nil {
+ if err = v.trySynchronizing(volumeServer, grpcDialOption, masterMap, compactRevision); err == nil {
return
}
}
@@ -77,7 +78,7 @@ func (a ByOffset) Less(i, j int) bool { return a[i].Offset < a[j].Offset }
// trySynchronizing sync with remote volume server incrementally by
// make up the local and remote delta.
-func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.CompactMap, compactRevision uint16) error {
+func (v *Volume) trySynchronizing(volumeServer string, grpcDialOption grpc.DialOption, masterMap *needle.CompactMap, compactRevision uint16) error {
slaveIdxFile, err := os.Open(v.nm.IndexFileName())
if err != nil {
return fmt.Errorf("Open volume %d index file: %v", v.Id, err)
@@ -126,7 +127,7 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
continue
}
// add master file entry to local data file
- if err := v.fetchNeedle(volumeServer, needleValue, compactRevision); err != nil {
+ if err := v.fetchNeedle(volumeServer, grpcDialOption, needleValue, compactRevision); err != nil {
glog.V(0).Infof("Fetch needle %v from %s: %v", needleValue, volumeServer, err)
return err
}
@@ -136,16 +137,16 @@ func (v *Volume) trySynchronizing(volumeServer string, masterMap *needle.Compact
return nil
}
-func fetchVolumeFileEntries(volumeServer string, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
+func fetchVolumeFileEntries(volumeServer string, grpcDialOption grpc.DialOption, vid VolumeId) (m *needle.CompactMap, lastOffset uint64, compactRevision uint16, err error) {
m = needle.NewCompactMap()
- syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, uint32(vid))
+ syncStatus, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil {
return m, 0, 0, err
}
total := 0
- err = operation.GetVolumeIdxEntries(volumeServer, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
+ err = operation.GetVolumeIdxEntries(volumeServer, grpcDialOption, uint32(vid), func(key NeedleId, offset Offset, size uint32) {
// println("remote key", key, "offset", offset*NeedlePaddingSize, "size", size)
if offset > 0 && size != TombstoneFileSize {
m.Set(NeedleId(key), offset, size)
@@ -187,9 +188,9 @@ func (v *Volume) removeNeedle(key NeedleId) {
// fetchNeedle fetches a remote volume needle by vid, id, offset
// The compact revision is checked first in case the remote volume
// is compacted and the offset is invalid any more.
-func (v *Volume) fetchNeedle(volumeServer string, needleValue needle.NeedleValue, compactRevision uint16) error {
+func (v *Volume) fetchNeedle(volumeServer string, grpcDialOption grpc.DialOption, needleValue needle.NeedleValue, compactRevision uint16) error {
- return operation.WithVolumeServerClient(volumeServer, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncData(context.Background(), &volume_server_pb.VolumeSyncDataRequest{
VolumdId: uint32(v.Id),
Revision: uint32(compactRevision),
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 55796ab43..ff0bbce42 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -2,6 +2,7 @@ package topology
import (
"context"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -13,9 +14,9 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
+func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid storage.VolumeId, option *VolumeGrowOption) error {
- return operation.WithVolumeServerClient(dn.Url(), func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index a301103eb..041351492 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -1,6 +1,7 @@
package topology
import (
+ "google.golang.org/grpc"
"math/rand"
"time"
@@ -8,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
-func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallocate int64) {
+func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) {
go func() {
for {
if t.IsLeader() {
@@ -22,7 +23,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold float64, preallo
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
- t.Vacuum(garbageThreshold, preallocate)
+ t.Vacuum(grpcDialOption, garbageThreshold, preallocate)
}
}
}(garbageThreshold)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 48a75ba9d..71d3ead76 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,6 +2,7 @@ package topology
import (
"context"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,11 +11,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
-func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
+func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
- err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -46,13 +47,13 @@ func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist
}
return isCheckSuccess
}
-func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
+func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
vl.removeFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumdId: uint32(vid),
})
@@ -79,11 +80,11 @@ func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationli
}
return isVacuumSuccess
}
-func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumdId: uint32(vid),
})
@@ -101,10 +102,10 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
return isCommitSuccess
}
-func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
+func batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumdId: uint32(vid),
})
@@ -118,21 +119,21 @@ func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationli
}
}
-func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int {
+func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, preallocate int64) int {
glog.V(1).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
- vacuumOneVolumeLayout(volumeLayout, c, garbageThreshold, preallocate)
+ vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, preallocate)
}
}
}
return 0
}
-func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
+func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, preallocate int64) {
volumeLayout.accessLock.RLock()
tmpMap := make(map[storage.VolumeId]*VolumeLocationList)
@@ -152,11 +153,11 @@ func vacuumOneVolumeLayout(volumeLayout *VolumeLayout, c *Collection, garbageThr
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
- if batchVacuumVolumeCheck(volumeLayout, vid, locationList, garbageThreshold) {
- if batchVacuumVolumeCompact(volumeLayout, vid, locationList, preallocate) {
- batchVacuumVolumeCommit(volumeLayout, vid, locationList)
+ if batchVacuumVolumeCheck(grpcDialOption, volumeLayout, vid, locationList, garbageThreshold) {
+ if batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, locationList, preallocate) {
+ batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, locationList)
} else {
- batchVacuumVolumeCleanup(volumeLayout, vid, locationList)
+ batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, locationList)
}
}
}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 9bf013ca6..3d178b827 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -2,6 +2,7 @@ package topology
import (
"fmt"
+ "google.golang.org/grpc"
"math/rand"
"sync"
@@ -55,19 +56,19 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(grpcDialOption, vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil
}
return count, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(topo, option); e == nil {
+ if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
counter += c
} else {
return counter, e
@@ -76,13 +77,13 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOp
return
}
-func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
return 0, e
}
vid := topo.NextVolumeId()
- err := vg.grow(topo, vid, option, servers...)
+ err := vg.grow(grpcDialOption, topo, vid, option, servers...)
return len(servers), err
}
@@ -189,9 +190,9 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
return
}
-func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, option); err == nil {
+ if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
index 7fa650855..b26366ae0 100644
--- a/weed/util/grpc_client_server.go
+++ b/weed/util/grpc_client_server.go
@@ -38,7 +38,7 @@ func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
// opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
var options []grpc.DialOption
options = append(options,
- grpc.WithInsecure(),
+ // grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // client ping server if no activity for this long
Timeout: 20 * time.Second,
@@ -48,7 +48,7 @@ func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error)
options = append(options, opt)
}
}
- return grpc.Dial(address, opts...)
+ return grpc.Dial(address, options...)
}
func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index b26853945..3600fe7c7 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -3,29 +3,32 @@ package wdclient
import (
"context"
"fmt"
+ "math/rand"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "math/rand"
+ "google.golang.org/grpc"
)
type MasterClient struct {
- ctx context.Context
- name string
- currentMaster string
- masters []string
+ ctx context.Context
+ name string
+ currentMaster string
+ masters []string
+ grpcDialOption grpc.DialOption
vidMap
}
-func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
+func NewMasterClient(ctx context.Context, grpcDialOption grpc.DialOption, clientName string, masters []string) *MasterClient {
return &MasterClient{
- ctx: ctx,
- name: clientName,
- masters: masters,
- vidMap: newVidMap(),
+ ctx: ctx,
+ name: clientName,
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ vidMap: newVidMap(),
}
}
@@ -50,7 +53,7 @@ func (mc *MasterClient) KeepConnectedToMaster() {
func (mc *MasterClient) tryAllMasters() {
for _, master := range mc.masters {
glog.V(0).Infof("Connecting to master %v", master)
- gprcErr := withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ gprcErr := withMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(context.Background())
if err != nil {
@@ -96,14 +99,14 @@ func (mc *MasterClient) tryAllMasters() {
}
}
-func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+func withMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0)
if parseErr != nil {
return fmt.Errorf("failed to parse master grpc %v", master)
}
- grpcConnection, err := util.GrpcDial(masterGrpcAddress)
+ grpcConnection, err := util.GrpcDial(masterGrpcAddress, grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", master, err)
}
diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
deleted file mode 100644
index 722f4d061..000000000
--- a/weed/wdclient/wdclient.go
+++ /dev/null
@@ -1,15 +0,0 @@
-package wdclient
-
-import (
- "context"
-)
-
-type SeaweedClient struct {
- *MasterClient
-}
-
-func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient {
- return &SeaweedClient{
- MasterClient: NewMasterClient(ctx, clientName, masters),
- }
-}