aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-07-28 03:58:13 -0700
committerChris Lu <chris.lu@gmail.com>2019-07-28 03:58:13 -0700
commit8afd8d35b3230f6fc286967e8aa9641bd8c1460c (patch)
tree8c134e958969a7aec6337c8c7bab120009006941
parent2c6cf72e73861f3bdefaba28189fa2b15fa26d9e (diff)
downloadseaweedfs-8afd8d35b3230f6fc286967e8aa9641bd8c1460c.tar.xz
seaweedfs-8afd8d35b3230f6fc286967e8aa9641bd8c1460c.zip
master: followers can also lookup and redirect
improve scalability
-rw-r--r--weed/command/master.go13
-rw-r--r--weed/command/server.go6
-rw-r--r--weed/filer2/filer_deletion.go2
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/master_server.go15
-rw-r--r--weed/server/master_server_handlers.go49
-rw-r--r--weed/server/master_server_handlers_admin.go16
-rw-r--r--weed/shell/command_ec_encode.go4
-rw-r--r--weed/wdclient/vid_map.go15
9 files changed, 76 insertions, 50 deletions
diff --git a/weed/command/master.go b/weed/command/master.go
index fb09a24b4..3d33f4f7a 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -100,8 +100,11 @@ func runMaster(cmd *Command, args []string) bool {
}
func startMaster(masterOption MasterOptions, masterWhiteList []string) {
+
+ myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
+
r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList))
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.VERSION, listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0)
@@ -109,7 +112,6 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("Master startup error: %v", e)
}
// start raftServer
- myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds)
if raftServer == nil {
@@ -131,6 +133,8 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterOption.ipBind, grpcPort)
go grpcS.Serve(grpcL)
+ go ms.MasterClient.KeepConnectedToMaster()
+
// start http server
httpS := &http.Server{Handler: r}
go httpS.Serve(masterListener)
@@ -152,11 +156,10 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
}
}
- peerCount := len(cleanedPeers)
if !hasSelf {
- peerCount += 1
+ cleanedPeers = append(cleanedPeers, masterAddress)
}
- if peerCount%2 == 0 {
+ if len(cleanedPeers)%2 == 0 {
glog.Fatalf("Only odd number of masters are supported!")
}
return
diff --git a/weed/command/server.go b/weed/command/server.go
index d39abd9ae..1c6439edc 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -5,7 +5,6 @@ import (
"os"
"runtime"
"runtime/pprof"
- "strconv"
"strings"
"time"
@@ -122,14 +121,13 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
- master := *serverIp + ":" + strconv.Itoa(*masterOptions.port)
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = &master
+ filerOptions.masters = masterOptions.peers
filerOptions.ip = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
- serverOptions.v.masters = &master
+ serverOptions.v.masters = masterOptions.peers
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index fea93d57f..25e27e504 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -15,7 +15,7 @@ func (f *Filer) loopProcessingDeletion() {
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
- locs := f.MasterClient.GetVidLocations(vid)
+ locs, _ := f.MasterClient.GetVidLocations(vid)
var locations []operation.Location
for _, loc := range locs {
locations = append(locations, operation.Location{
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 8eea2441e..0b395701d 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -95,7 +95,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return nil, err
}
var locs []*filer_pb.Location
- for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ continue
+ }
+ for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 3689b5495..e9eb32cca 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,8 +1,10 @@
package weed_server
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/shell"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
"net/http"
"net/http/httputil"
@@ -56,9 +58,11 @@ type MasterServer struct {
clientChans map[string]chan *master_pb.VolumeLocation
grpcDialOpiton grpc.DialOption
+
+ MasterClient *wdclient.MasterClient
}
-func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -73,11 +77,14 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
+
+ grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
- grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
+ grpcDialOpiton: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
@@ -92,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
@@ -102,7 +109,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
+ r.HandleFunc("/{fileId}", ms.redirectHandler)
}
ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 5c7ff41cf..93f983375 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -22,21 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
if _, ok := volumeLocations[vid]; ok {
continue
}
- volumeId, err := needle.NewVolumeId(vid)
- if err == nil {
- machines := ms.Topo.Lookup(collection, volumeId)
- if machines != nil {
- var ret []operation.Location
- for _, dn := range machines {
- ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
- }
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret}
- } else {
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("volumeId %s not found.", vid)}
- }
- } else {
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("Unknown volumeId format: %s", vid)}
- }
+ volumeLocations[vid] = ms.findVolumeLocation(collection, vid)
}
return
}
@@ -59,10 +45,8 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
vid = fileId[0:commaSep]
}
}
- vids := []string{vid}
collection := r.FormValue("collection") //optional, but can be faster if too many collections
- volumeLocations := ms.lookupVolumeId(vids, collection)
- location := volumeLocations[vid]
+ location := ms.findVolumeLocation(collection, vid)
httpStatus := http.StatusOK
if location.Error != "" {
httpStatus = http.StatusNotFound
@@ -74,6 +58,35 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
writeJsonQuiet(w, r, httpStatus, location)
}
+// findVolumeLocation finds the volume location from master topo if it is leader,
+// or from master client if not leader
+func (ms *MasterServer) findVolumeLocation(collection string, vid string) operation.LookupResult {
+ var locations []operation.Location
+ var err error
+ if ms.Topo.IsLeader() {
+ volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
+ machines := ms.Topo.Lookup(collection, volumeId)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ }
+ err = newVolumeIdErr
+ } else {
+ machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl})
+ }
+ err = getVidLocationsErr
+ }
+ ret := operation.LookupResult{
+ VolumeId: vid,
+ Locations: locations,
+ }
+ if err != nil {
+ ret.Error = err.Error()
+ }
+ return ret
+}
+
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
stats.AssignRequest()
requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64)
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 343bcb8da..6b5da1132 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -95,23 +95,19 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, err := needle.NewVolumeId(vid)
- if err != nil {
- debug("parsing error:", err, r.URL.Path)
- return
- }
collection := r.FormValue("collection")
- machines := ms.Topo.Lookup(collection, volumeId)
- if machines != nil && len(machines) > 0 {
+ location := ms.findVolumeLocation(collection, vid)
+ if location.Error == "" {
+ loc := location.Locations[rand.Intn(len(location.Locations))]
var url string
if r.URL.RawQuery != "" {
- url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
+ url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
} else {
- url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
+ url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
http.Redirect(w, r, url, http.StatusMovedPermanently)
} else {
- writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection))
+ writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 8ad0d51c8..f07cb93f9 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -87,8 +87,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
- locations := commandEnv.MasterClient.GetLocations(uint32(vid))
- if len(locations) == 0 {
+ locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
+ if !found {
return fmt.Errorf("volume %d not found", vid)
}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 06308944d..01d9cdaed 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -64,20 +64,25 @@ func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err er
return serverUrl, nil
}
-func (vc *vidMap) GetVidLocations(vid string) (locations []Location) {
+func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
- return nil
+ return nil, fmt.Errorf("Unknown volume id %s", vid)
}
- return vc.GetLocations(uint32(id))
+ foundLocations, found := vc.GetLocations(uint32(id))
+ if found {
+ return foundLocations, nil
+ }
+ return nil, fmt.Errorf("volume id %s not found", vid)
}
-func (vc *vidMap) GetLocations(vid uint32) (locations []Location) {
+func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
vc.RLock()
defer vc.RUnlock()
- return vc.vid2Locations[vid]
+ locations, found = vc.vid2Locations[vid]
+ return
}
func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) {