aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-02-18 12:11:52 -0800
committerChris Lu <chris.lu@gmail.com>2019-02-18 12:11:52 -0800
commit77b9af531d18e10b04b49b069b5f26a329ed4902 (patch)
treecae2524dfc445b352e5d6bab7a82f7af46b7a4c8 /weed/server
parent55761ae806bc7cc8ab34424508aee5481131b941 (diff)
downloadseaweedfs-77b9af531d18e10b04b49b069b5f26a329ed4902.tar.xz
seaweedfs-77b9af531d18e10b04b49b069b5f26a329ed4902.zip
adding grpc mutual tls
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go5
-rw-r--r--weed/server/filer_grpc_server.go4
-rw-r--r--weed/server/filer_server.go13
-rw-r--r--weed/server/filer_server_handlers_write.go2
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go7
-rw-r--r--weed/server/master_server_handlers.go2
-rw-r--r--weed/server/master_server_handlers_admin.go10
-rw-r--r--weed/server/volume_grpc_client_to_master.go11
-rw-r--r--weed/server/volume_server.go18
-rw-r--r--weed/server/volume_server_handlers_write.go2
11 files changed, 45 insertions, 31 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index c9f17aa86..1c75d44cf 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"net/http"
"path/filepath"
"strconv"
@@ -81,7 +82,7 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params...)
}
-func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
+func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -111,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
}
- assignResult, ae := operation.Assign(masterUrl, ar)
+ assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 9a83ee1a6..4f1377331 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -220,7 +220,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
DataCenter: "",
}
}
- assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest)
+ assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
return nil, fmt.Errorf("assign volume: %v", err)
}
@@ -254,7 +254,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Ttl: req.Ttl,
}
- output, err := operation.Statistics(fs.filer.GetMaster(), input)
+ output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input)
if err != nil {
return nil, err
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index c3c5072d0..2ace0a7ea 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "google.golang.org/grpc"
"net/http"
"os"
@@ -34,22 +35,24 @@ type FilerOption struct {
}
type FilerServer struct {
- option *FilerOption
- secret security.SigningKey
- filer *filer2.Filer
+ option *FilerOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
+ option: option,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
}
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
go fs.filer.KeepConnectedToMaster()
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 7cdbddde2..9e231c645 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -51,7 +51,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
}
}
- assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae)
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 6e9cd512d..13f8b37d1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -76,7 +76,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 06c959b92..a44a567d6 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
@@ -37,6 +38,8 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
+
+ grpcDialOpiton grpc.DialOption
}
func NewMasterServer(r *mux.Router, port int, metaFolder string,
@@ -48,7 +51,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
whiteList []string,
) *MasterServer {
- LoadConfiguration("security", false)
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -64,6 +66,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
@@ -89,7 +92,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
- ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate)
return ms
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index c4149e0cf..5bdb448c1 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -93,7 +93,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 3a2662908..eccf3ee4c 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -24,7 +24,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -60,7 +60,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(gcThreshold, ms.preallocate)
+ ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate)
ms.dirStatusHandler(w, r)
}
@@ -76,7 +76,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
} else {
- count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
+ count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -126,13 +126,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, ms.selfUrl(r))
+ submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, masterUrl)
+ submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton)
}
}
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 25e9b1677..38603e4b6 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,6 +2,9 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -19,6 +22,8 @@ func (vs *VolumeServer) heartbeat() {
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume")
+
var err error
var newLeader string
for {
@@ -31,7 +36,7 @@ func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress)
continue
}
- newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second)
+ newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
@@ -40,9 +45,9 @@ func (vs *VolumeServer) heartbeat() {
}
}
-func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(masterGrpcAddress)
+ grpcConection, err := util.GrpcDial(masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index d8ff01766..8e77ec570 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "google.golang.org/grpc"
"net/http"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,13 +11,14 @@ import (
)
type VolumeServer struct {
- MasterNodes []string
- currentMaster string
- pulseSeconds int
- dataCenter string
- rack string
- store *storage.Store
- guard *security.Guard
+ MasterNodes []string
+ currentMaster string
+ pulseSeconds int
+ dataCenter string
+ rack string
+ store *storage.Store
+ guard *security.Guard
+ grpcDialOption grpc.DialOption
needleMapKind storage.NeedleMapType
FixJpgOrientation bool
@@ -33,7 +35,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fixJpgOrientation bool,
readRedirect bool) *VolumeServer {
- LoadConfiguration("security", false)
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
enableUiAccess := v.GetBool("access.ui")
@@ -45,6 +46,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
}
vs.MasterNodes = masterNodes
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 1cfd9187e..6b78cea40 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -95,7 +95,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
// make sure all chunks had deleted before delete manifest
- if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil {
+ if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}