diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 94 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 10 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 2 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 12 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client.go | 52 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 33 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_ui.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 6 | ||||
| -rw-r--r-- | weed/server/volume_server_ui/templates.go | 4 |
12 files changed, 81 insertions, 152 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index da0014af3..8a80cded5 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -88,7 +88,7 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { - lookupResult, err := operation.LookupVolumeIds(fs.getMasterNode(), req.VolumeIds) + lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds) if err != nil { return nil, err } @@ -172,11 +172,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr if err = fs.filer.UpdateEntry(newEntry); err == nil { for _, garbage := range unusedChunks { glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) - operation.DeleteFile(fs.master, garbage.FileId, fs.jwt(garbage.FileId)) + operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId)) } for _, garbage := range garbages { glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) - operation.DeleteFile(fs.master, garbage.FileId, fs.jwt(garbage.FileId)) + operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId)) } } @@ -190,7 +190,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - assignResult, err := operation.Assign(fs.master, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(fs.filer.GetMaster(), &operation.VolumeAssignRequest{ Count: uint64(req.Count), Replication: req.Replication, Collection: req.Collection, diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 3e175e960..827971a0d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,12 +1,8 @@ package weed_server import ( - "math/rand" "net/http" "strconv" - "sync" - "time" - "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" @@ -14,16 +10,13 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" ) type FilerServer struct { port string - master string - mnLock sync.RWMutex + masters []string collection string defaultReplication string redirectOnRead bool @@ -31,16 +24,15 @@ type FilerServer struct { secret security.Secret filer *filer2.Filer maxMB int - masterNodes *storage.MasterNodes } -func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, master string, collection string, +func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, masters []string, collection string, replication string, redirectOnRead bool, disableDirListing bool, maxMB int, secret string, ) (fs *FilerServer, err error) { fs = &FilerServer{ - master: master, + masters: masters, collection: collection, defaultReplication: replication, redirectOnRead: redirectOnRead, @@ -48,7 +40,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } - fs.filer = filer2.NewFiler(master) + + if len(masters) == 0 { + glog.Fatal("master list is required!") + } + + fs.filer = filer2.NewFiler(masters) + + go fs.filer.KeepConnectedToMaster() fs.filer.LoadConfiguration() @@ -59,78 +58,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - go func() { - connected := true - - fs.masterNodes = storage.NewMasterNodes(fs.master) - glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode()) - - for { - glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode()) - master, err := fs.detectHealthyMaster(fs.getMasterNode()) - if err == nil { - if !connected { - connected = true - if fs.getMasterNode() != master { - fs.setMasterNode(master) - } - glog.V(0).Infoln("Filer Server Connected with master at", master) - } - } else { - glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err) - if connected { - connected = false - } - } - if connected { - time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond) - } else { - time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond) - } - } - }() - return fs, nil } func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { return security.GenJwt(fs.secret, fileId) } - -func (fs *FilerServer) getMasterNode() string { - fs.mnLock.RLock() - defer fs.mnLock.RUnlock() - return fs.master -} - -func (fs *FilerServer) setMasterNode(masterNode string) { - fs.mnLock.Lock() - defer fs.mnLock.Unlock() - fs.master = masterNode -} - -func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) { - if e = checkMaster(masterNode); e != nil { - fs.masterNodes.Reset() - for i := 0; i <= 3; i++ { - master, e = fs.masterNodes.FindMaster() - if e != nil { - continue - } else { - if e = checkMaster(master); e == nil { - break - } - } - } - } else { - master = masterNode - } - return -} - -func checkMaster(masterNode string) error { - statUrl := "http://" + masterNode + "/stats/health" - glog.V(4).Infof("Connecting to %s ...", statUrl) - _, e := util.Get(statUrl) - return e -} diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 7a208b560..c690575b6 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -63,7 +63,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, fileId := entry.Chunks[0].FileId - urlString, err := operation.LookupFileId(fs.getMasterNode(), fileId) + urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) w.WriteHeader(http.StatusNotFound) @@ -225,7 +225,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int for _, chunkView := range chunkViews { - urlString, err := operation.LookupFileId(fs.getMasterNode(), chunkView.FileId) + urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 7d93d4485..4c2820e6b 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -32,7 +32,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques writeJsonError(w, r, http.StatusInternalServerError, err) } else { fileId = entry.Chunks[0].FileId - urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId) + urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) @@ -48,7 +48,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, Collection: collection, Ttl: r.URL.Query().Get("ttl"), } - assignResult, ae := operation.Assign(fs.getMasterNode(), ar) + assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar) if ae != nil { glog.V(0).Infoln("failing to assign a file id", ae.Error()) writeJsonError(w, r, http.StatusInternalServerError, ae) @@ -145,7 +145,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up + operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) @@ -157,7 +157,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "PUT" { if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { oldFid := entry.Chunks[0].FileId - operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid)) } else if err != nil && err != filer2.ErrNotFound { glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) } @@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { }}, } if db_err := fs.filer.CreateEntry(entry); db_err != nil { - operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up + operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 5156ae02c..adc50d030 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -147,7 +147,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { for _, chunk := range entry.Chunks { oldFid := chunk.FileId - operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) + operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid)) } } else if err != nil { glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index d9b8f9e09..e97cc126e 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -77,3 +77,15 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } } + +func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error { + for { + _, err := stream.Recv() + if err != nil { + return err + } + if err := stream.Send(&master_pb.Empty{}); err != nil { + return err + } + } +} diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go index 2f3f36924..7688745e2 100644 --- a/weed/server/volume_grpc_client.go +++ b/weed/server/volume_grpc_client.go @@ -7,49 +7,51 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage" "golang.org/x/net/context" "google.golang.org/grpc" ) +func (vs *VolumeServer) GetMaster() string { + return vs.currentMaster +} func (vs *VolumeServer) heartbeat() { - glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) - vs.masterNodes = storage.NewMasterNodes(vs.masterNode) + glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes) vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) + var err error + var newLeader string for { - err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second) - if err != nil { - glog.V(0).Infof("heartbeat error: %v", err) - time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) + for _, master := range vs.MasterNodes { + if newLeader != "" { + master = newLeader + } + newLeader, err = vs.doHeartbeat(master, time.Duration(vs.pulseSeconds)*time.Second) + if err != nil { + glog.V(0).Infof("heartbeat error: %v", err) + time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) + } } } } -func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { - - vs.masterNodes.Reset() - masterNode, err := vs.masterNodes.FindMaster() - if err != nil { - return fmt.Errorf("No master found: %v", err) - } +func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) { grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure()) if err != nil { - return fmt.Errorf("fail to dial: %v", err) + return "", fmt.Errorf("fail to dial: %v", err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) stream, err := client.SendHeartbeat(context.Background()) if err != nil { - glog.V(0).Infof("%v.SendHeartbeat(_) = _, %v", client, err) - return err + glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) + return "", err } - vs.SetMasterNode(masterNode) - glog.V(0).Infof("Heartbeat to %s", masterNode) + glog.V(0).Infof("Heartbeat to: %v", masterNode) + vs.currentMaster = masterNode vs.store.Client = stream defer func() { vs.store.Client = nil }() @@ -70,7 +72,8 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { vs.guard.SecretKey = security.Secret(in.GetSecretKey()) } if in.GetLeader() != "" && masterNode != in.GetLeader() { - vs.masterNodes.SetPossibleLeader(in.GetLeader()) + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) + newLeader = in.GetLeader() doneChan <- nil return } @@ -79,7 +82,7 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return err + return "", err } tickChan := time.Tick(sleepInterval) @@ -89,11 +92,10 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { case <-tickChan: if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return err + return "", err } - case err := <-doneChan: - glog.V(0).Infof("Volume Server heart beat stops with %v", err) - return err + case <-doneChan: + return } } } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 0b7e09c59..9294f9bf6 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -2,22 +2,19 @@ package weed_server import ( "net/http" - "sync" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" ) type VolumeServer struct { - masterNode string - mnLock sync.RWMutex - pulseSeconds int - dataCenter string - rack string - store *storage.Store - guard *security.Guard - masterNodes *storage.MasterNodes + MasterNodes []string + currentMaster string + pulseSeconds int + dataCenter string + rack string + store *storage.Store + guard *security.Guard needleMapKind storage.NeedleMapType FixJpgOrientation bool @@ -28,7 +25,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, folders []string, maxCounts []int, needleMapKind storage.NeedleMapType, - masterNode string, pulseSeconds int, + masterNodes []string, pulseSeconds int, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, @@ -41,7 +38,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, } - vs.SetMasterNode(masterNode) + vs.MasterNodes = masterNodes vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, "") @@ -76,18 +73,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, return vs } -func (vs *VolumeServer) GetMasterNode() string { - vs.mnLock.RLock() - defer vs.mnLock.RUnlock() - return vs.masterNode -} - -func (vs *VolumeServer) SetMasterNode(masterNode string) { - vs.mnLock.Lock() - defer vs.mnLock.Unlock() - vs.masterNode = masterNode -} - func (vs *VolumeServer) Shutdown() { glog.V(0).Infoln("Shutting down volume server...") vs.store.Close() diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index a90d4c0e2..b784dd60e 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -46,7 +46,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotFound) return } - lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) + lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) @@ -176,7 +176,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, chunkedFileReader := &operation.ChunkedFileReader{ Manifest: chunkManifest, - Master: vs.GetMasterNode(), + Master: vs.GetMaster(), } defer chunkedFileReader.Close() if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil { diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 6fc775a6d..c75c66bae 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -21,14 +21,14 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) } args := struct { Version string - Master string + Masters []string Volumes interface{} DiskStatuses interface{} Stats interface{} Counters *stats.ServerStats }{ util.VERSION, - vs.masterNode, + vs.MasterNodes, vs.store.Status(), ds, infos, diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index e45c2245c..3864ec903 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -31,7 +31,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), + size, errorStatus := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) httpStatus := http.StatusCreated if errorStatus != "" { @@ -80,14 +80,14 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMasterNode()); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return } count = chunkManifest.Size } - _, err := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r) + _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r) if err == nil { m := make(map[string]int64) diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index c3db6e92a..5f01588f4 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -72,8 +72,8 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <h2>System Stats</h2> <table class="table table-condensed table-striped"> <tr> - <th>Master</th> - <td><a href="http://{{.Master}}/ui/index.html">{{.Master}}</a></td> + <th>Masters</th> + <td>{{.Masters}}</td> </tr> <tr> <th>Weekly # ReadRequests</th> |
