diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2019-02-18 15:05:32 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-18 15:05:32 -0800 |
| commit | 9a4dda30118fa4e076b2ef4ea6abe14ca898a84e (patch) | |
| tree | c5d379b2a92d78607e82246ca2abed09bd1c0b2d /weed/server | |
| parent | a1c7dc380683d44e59a18c2e71c9c3aa7734835f (diff) | |
| parent | 77b9af531d18e10b04b49b069b5f26a329ed4902 (diff) | |
| download | seaweedfs-9a4dda30118fa4e076b2ef4ea6abe14ca898a84e.tar.xz seaweedfs-9a4dda30118fa4e076b2ef4ea6abe14ca898a84e.zip | |
Merge pull request #855 from chrislusf/add_jwt
Add jwt
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 9 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 14 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 13 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 9 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 1 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 4 | ||||
| -rw-r--r-- | weed/server/master_server.go | 13 | ||||
| -rw-r--r-- | weed/server/master_server_handlers.go | 33 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 10 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 15 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 40 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers.go | 31 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 16 |
15 files changed, 155 insertions, 60 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index d88abfdc8..1c75d44cf 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "google.golang.org/grpc" "net/http" "path/filepath" "strconv" @@ -13,7 +14,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" @@ -82,8 +82,7 @@ func debug(params ...interface{}) { glog.V(4).Infoln(params...) } -func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { - jwt := security.GetJwt(r) +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) { m := make(map[string]interface{}) if r.Method != "POST" { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -113,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), } - assignResult, ae := operation.Assign(masterUrl, ar) + assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) if ae != nil { writeJsonError(w, r, http.StatusInternalServerError, ae) return @@ -125,7 +124,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 06589e3c6..4f1377331 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -220,7 +220,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol DataCenter: "", } } - assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest) + assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { return nil, fmt.Errorf("assign volume: %v", err) } @@ -233,6 +233,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Count: int32(assignResult.Count), Url: assignResult.Url, PublicUrl: assignResult.PublicUrl, + Auth: string(assignResult.Auth), }, err } @@ -253,7 +254,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR Ttl: req.Ttl, } - output, err := operation.Statistics(fs.filer.GetMaster(), input) + output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input) if err != nil { return nil, err } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 9d70e4dac..2ace0a7ea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "google.golang.org/grpc" "net/http" "os" @@ -28,29 +29,30 @@ type FilerOption struct { RedirectOnRead bool DisableDirListing bool MaxMB int - SecretKey string DirListingLimit int DataCenter string DefaultLevelDbDir string } type FilerServer struct { - option *FilerOption - secret security.Secret - filer *filer2.Filer + option *FilerOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, + option: option, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), } if len(option.Masters) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption) go fs.filer.KeepConnectedToMaster() diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 32f481e74..9e231c645 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net/http" "net/url" + "os" "strconv" "strings" "time" @@ -14,8 +15,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "os" ) var ( @@ -31,7 +32,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) { +func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { ar := &operation.VolumeAssignRequest{ Count: 1, Replication: replication, @@ -50,7 +51,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } } - assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest) + assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) writeJsonError(w, r, http.StatusInternalServerError, ae) @@ -59,6 +60,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + auth = assignResult.Auth return } @@ -82,7 +84,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - fileId, urlLocation, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) @@ -115,6 +117,9 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { Host: r.Host, ContentLength: r.ContentLength, } + if auth != "" { + request.Header.Set("Authorization", "BEARER "+string(auth)) + } resp, do_err := util.Do(request) if do_err != nil { glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 4b1745aaa..b9c0691c7 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -105,14 +106,14 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { writtenChunks = writtenChunks + 1 - fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) if assignErr != nil { return nil, assignErr } // upload the chunk to the volume server chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10) - uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId, auth) if uploadErr != nil { return nil, uploadErr } @@ -175,11 +176,11 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte return } -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string, auth security.EncodedJwt) (err error) { err = nil ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) - uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, auth) if uploadResult != nil { glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) } diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 93dce59d8..043a6ff51 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -67,7 +67,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), }); err != nil { return err } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index ae0819d2d..13f8b37d1 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -75,7 +76,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { ms.vgLock.Unlock() return nil, fmt.Errorf("Cannot grow volume group! %v", err) } @@ -92,6 +93,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count, + Auth: string(security.GenJwt(ms.guard.SigningKey, fid)), }, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 492bb76e9..a44a567d6 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" @@ -15,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/spf13/viper" ) type MasterServer struct { @@ -36,6 +38,8 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation + + grpcDialOpiton grpc.DialOption } func NewMasterServer(r *mux.Router, port int, metaFolder string, @@ -45,9 +49,11 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, defaultReplicaPlacement string, garbageThreshold float64, whiteList []string, - secureKey string, ) *MasterServer { + v := viper.GetViper() + signingKey := v.GetString("jwt.signing.key") + var preallocateSize int64 if preallocate { preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) @@ -60,6 +66,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, defaultReplicaPlacement: defaultReplicaPlacement, garbageThreshold: garbageThreshold, clientChans: make(map[string]chan *master_pb.VolumeLocation), + grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() @@ -67,7 +74,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - ms.guard = security.NewGuard(whiteList, secureKey) + ms.guard = security.NewGuard(whiteList, signingKey) handleStaticResources2(r) r.HandleFunc("/", ms.uiStatusHandler) @@ -85,7 +92,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate) + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate) return ms } diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index a797dddfc..5bdb448c1 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" ) @@ -40,12 +41,23 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume return } -// Takes one volumeId only, can not do batch lookup +// If "fileId" is provided, this returns the fileId location and a JWT to update or delete the file. +// If "volumeId" is provided, this only returns the volumeId location func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] + if vid != "" { + // backward compatible + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + } + fileId := r.FormValue("fileId") + if fileId != "" { + commaSep := strings.Index(fileId, ",") + if commaSep > 0 { + vid = fileId[0:commaSep] + } } vids := []string{vid} collection := r.FormValue("collection") //optional, but can be faster if too many collections @@ -54,6 +66,8 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) httpStatus := http.StatusOK if location.Error != "" { httpStatus = http.StatusNotFound + } else { + ms.maybeAddJwtAuthorization(w, fileId) } writeJsonQuiet(w, r, httpStatus, location) } @@ -79,7 +93,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) ms.vgLock.Lock() defer ms.vgLock.Unlock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Cannot grow volume group! %v", err)) return @@ -88,8 +102,17 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { + ms.maybeAddJwtAuthorization(w, fid) writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) } else { writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) } } + +func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string) { + encodedJwt := security.GenJwt(ms.guard.SigningKey, fileId) + if encodedJwt == "" { + return + } + w.Header().Set("Authorization", "BEARER "+string(encodedJwt)) +} diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 3a2662908..eccf3ee4c 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -24,7 +24,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() @@ -60,7 +60,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(gcThreshold, ms.preallocate) + ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate) ms.dirStatusHandler(w, r) } @@ -76,7 +76,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) } else { - count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) + count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) } } else { err = errors.New("parameter count is not found") @@ -126,13 +126,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r)) + submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, masterUrl) + submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton) } } } diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index c332da38e..7afef0b15 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -131,7 +131,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, func isTheFirstOne(self string, peers []string) bool { sort.Strings(peers) - if len(peers)<=0{ + if len(peers) <= 0 { return true } return self == peers[0] diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index bd3ffd7b3..38603e4b6 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,11 +2,13 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" + "google.golang.org/grpc" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "golang.org/x/net/context" ) @@ -20,6 +22,8 @@ func (vs *VolumeServer) heartbeat() { vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) + grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume") + var err error var newLeader string for { @@ -32,7 +36,7 @@ func (vs *VolumeServer) heartbeat() { glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress) continue } - newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second) + newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) @@ -41,9 +45,9 @@ func (vs *VolumeServer) heartbeat() { } } -func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(masterGrpcAddress) + grpcConection, err := util.GrpcDial(masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } @@ -73,9 +77,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI if in.GetVolumeSizeLimit() != 0 { vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit() } - if in.GetSecretKey() != "" { - vs.guard.SecretKey = security.Secret(in.GetSecretKey()) - } if in.GetLeader() != "" && masterNode != in.GetLeader() { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) newLeader = in.GetLeader() diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 0914e81b0..8e77ec570 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,21 +1,24 @@ package weed_server import ( + "google.golang.org/grpc" "net/http" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/spf13/viper" ) type VolumeServer struct { - MasterNodes []string - currentMaster string - pulseSeconds int - dataCenter string - rack string - store *storage.Store - guard *security.Guard + MasterNodes []string + currentMaster string + pulseSeconds int + dataCenter string + rack string + store *storage.Store + guard *security.Guard + grpcDialOption grpc.DialOption needleMapKind storage.NeedleMapType FixJpgOrientation bool @@ -31,6 +34,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, whiteList []string, fixJpgOrientation bool, readRedirect bool) *VolumeServer { + + v := viper.GetViper() + signingKey := v.GetString("jwt.signing.key") + enableUiAccess := v.GetBool("access.ui") + vs := &VolumeServer{ pulseSeconds: pulseSeconds, dataCenter: dataCenter, @@ -38,18 +46,22 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), } vs.MasterNodes = masterNodes vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - vs.guard = security.NewGuard(whiteList, "") + vs.guard = security.NewGuard(whiteList, signingKey) handleStaticResources(adminMux) - adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) - adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + if signingKey == "" || enableUiAccess { + // only expose the volume server details for safe environments + adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) + adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + } adminMux.HandleFunc("/", vs.privateStoreHandler) if publicMux != adminMux { // separated admin and public port @@ -69,5 +81,5 @@ func (vs *VolumeServer) Shutdown() { } func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(vs.guard.SecretKey, fileId) + return security.GenJwt(vs.guard.SigningKey, fileId) } diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 77b1274fd..0e9aaeb3b 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -3,6 +3,8 @@ package weed_server import ( "net/http" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" ) @@ -45,3 +47,32 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req vs.GetOrHeadHandler(w, r) } } + +func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string) bool { + + if len(vs.guard.SigningKey) == 0 { + return true + } + + tokenStr := security.GetJwt(r) + if tokenStr == "" { + glog.V(1).Infof("missing jwt from %s", r.RemoteAddr) + return false + } + + token, err := security.DecodeJwt(vs.guard.SigningKey, tokenStr) + if err != nil { + glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err) + return false + } + if !token.Valid { + glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr) + return false + } + + if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok { + return sc.Fid == vid+","+fid + } + glog.V(1).Infof("unexpected jwt from %s: %v", r.RemoteAddr, tokenStr) + return false +} diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index fd93142e1..6b78cea40 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -20,13 +20,20 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonError(w, r, http.StatusBadRequest, e) return } - vid, _, _, _, _ := parseURLPath(r.URL.Path) + + vid, fid, _, _, _ := parseURLPath(r.URL.Path) volumeId, ve := storage.NewVolumeId(vid) if ve != nil { glog.V(0).Infoln("NewVolumeId error:", ve) writeJsonError(w, r, http.StatusBadRequest, ve) return } + + if !vs.maybeCheckJwtAuthorization(r, vid, fid) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) @@ -56,6 +63,11 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { volumeId, _ := storage.NewVolumeId(vid) n.ParsePath(fid) + if !vs.maybeCheckJwtAuthorization(r, vid, fid) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + // glog.V(2).Infof("volume %s deleting %s", vid, n) cookie := n.Cookie @@ -83,7 +95,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return } |
