diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-02-18 12:11:52 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-02-18 12:11:52 -0800 |
| commit | 77b9af531d18e10b04b49b069b5f26a329ed4902 (patch) | |
| tree | cae2524dfc445b352e5d6bab7a82f7af46b7a4c8 /weed/server | |
| parent | 55761ae806bc7cc8ab34424508aee5481131b941 (diff) | |
| download | seaweedfs-77b9af531d18e10b04b49b069b5f26a329ed4902.tar.xz seaweedfs-77b9af531d18e10b04b49b069b5f26a329ed4902.zip | |
adding grpc mutual tls
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 13 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 2 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server.go | 7 | ||||
| -rw-r--r-- | weed/server/master_server_handlers.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 10 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 11 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 18 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 2 |
11 files changed, 45 insertions, 31 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index c9f17aa86..1c75d44cf 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "google.golang.org/grpc" "net/http" "path/filepath" "strconv" @@ -81,7 +82,7 @@ func debug(params ...interface{}) { glog.V(4).Infoln(params...) } -func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) { m := make(map[string]interface{}) if r.Method != "POST" { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -111,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), } - assignResult, ae := operation.Assign(masterUrl, ar) + assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) if ae != nil { writeJsonError(w, r, http.StatusInternalServerError, ae) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 9a83ee1a6..4f1377331 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -220,7 +220,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol DataCenter: "", } } - assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest) + assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { return nil, fmt.Errorf("assign volume: %v", err) } @@ -254,7 +254,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR Ttl: req.Ttl, } - output, err := operation.Statistics(fs.filer.GetMaster(), input) + output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input) if err != nil { return nil, err } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c3c5072d0..2ace0a7ea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "google.golang.org/grpc" "net/http" "os" @@ -34,22 +35,24 @@ type FilerOption struct { } type FilerServer struct { - option *FilerOption - secret security.SigningKey - filer *filer2.Filer + option *FilerOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, + option: option, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), } if len(option.Masters) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption) go fs.filer.KeepConnectedToMaster() diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 7cdbddde2..9e231c645 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -51,7 +51,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } } - assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest) + assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) writeJsonError(w, r, http.StatusInternalServerError, ae) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 6e9cd512d..13f8b37d1 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -76,7 +76,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { ms.vgLock.Unlock() return nil, fmt.Errorf("Cannot grow volume group! %v", err) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 06c959b92..a44a567d6 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" @@ -37,6 +38,8 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation + + grpcDialOpiton grpc.DialOption } func NewMasterServer(r *mux.Router, port int, metaFolder string, @@ -48,7 +51,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, whiteList []string, ) *MasterServer { - LoadConfiguration("security", false) v := viper.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -64,6 +66,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, defaultReplicaPlacement: defaultReplicaPlacement, garbageThreshold: garbageThreshold, clientChans: make(map[string]chan *master_pb.VolumeLocation), + grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() @@ -89,7 +92,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate) + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate) return ms } diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index c4149e0cf..5bdb448c1 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -93,7 +93,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) ms.vgLock.Lock() defer ms.vgLock.Unlock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Cannot grow volume group! %v", err)) return diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 3a2662908..eccf3ee4c 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -24,7 +24,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() @@ -60,7 +60,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(gcThreshold, ms.preallocate) + ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate) ms.dirStatusHandler(w, r) } @@ -76,7 +76,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) } else { - count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) + count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) } } else { err = errors.New("parameter count is not found") @@ -126,13 +126,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r)) + submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, masterUrl) + submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton) } } } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 25e9b1677..38603e4b6 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,6 +2,9 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "google.golang.org/grpc" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -19,6 +22,8 @@ func (vs *VolumeServer) heartbeat() { vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) + grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume") + var err error var newLeader string for { @@ -31,7 +36,7 @@ func (vs *VolumeServer) heartbeat() { glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress) continue } - newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second) + newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) @@ -40,9 +45,9 @@ func (vs *VolumeServer) heartbeat() { } } -func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(masterGrpcAddress) + grpcConection, err := util.GrpcDial(masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index d8ff01766..8e77ec570 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "google.golang.org/grpc" "net/http" "github.com/chrislusf/seaweedfs/weed/glog" @@ -10,13 +11,14 @@ import ( ) type VolumeServer struct { - MasterNodes []string - currentMaster string - pulseSeconds int - dataCenter string - rack string - store *storage.Store - guard *security.Guard + MasterNodes []string + currentMaster string + pulseSeconds int + dataCenter string + rack string + store *storage.Store + guard *security.Guard + grpcDialOption grpc.DialOption needleMapKind storage.NeedleMapType FixJpgOrientation bool @@ -33,7 +35,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fixJpgOrientation bool, readRedirect bool) *VolumeServer { - LoadConfiguration("security", false) v := viper.GetViper() signingKey := v.GetString("jwt.signing.key") enableUiAccess := v.GetBool("access.ui") @@ -45,6 +46,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), } vs.MasterNodes = masterNodes vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 1cfd9187e..6b78cea40 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -95,7 +95,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return } |
