diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2018-07-28 21:03:29 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-07-28 21:03:29 -0700 |
| commit | 452bd0b01393e53e958fb9825bf1f27e6b3522df (patch) | |
| tree | e1a61e592118f9696b7f51501d3b3fd0f6c3eeb5 /weed/server | |
| parent | 97603d6e176dd2b9f2aebd9f6122a8c60481463a (diff) | |
| parent | d3205a007071f26587affb416f71b5c63854b863 (diff) | |
| download | seaweedfs-452bd0b01393e53e958fb9825bf1f27e6b3522df.tar.xz seaweedfs-452bd0b01393e53e958fb9825bf1f27e6b3522df.zip | |
Merge pull request #702 from chrislusf/add_topo_listener
Add volume id location change listener
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 20 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 6 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 114 | ||||
| -rw-r--r-- | weed/server/master_server.go | 6 |
5 files changed, 128 insertions, 23 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index b2f2d7a2d..0155eeccf 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -99,24 +99,24 @@ func (fs *FilerServer) GetEntryAttributes(ctx context.Context, req *filer_pb.Get func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { - lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster(), req.VolumeIds) - if err != nil { - return nil, err - } - resp := &filer_pb.LookupVolumeResponse{ LocationsMap: make(map[string]*filer_pb.Locations), } - for vid, locations := range lookupResult { + for _, vidString := range req.VolumeIds { + vid, err := strconv.Atoi(vidString) + if err != nil { + glog.V(1).Infof("Unknown volume id %s", vid) + return nil, err + } var locs []*filer_pb.Location - for _, loc := range locations.Locations { + for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) { locs = append(locs, &filer_pb.Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, }) } - resp.LocationsMap[vid] = &filer_pb.Locations{ + resp.LocationsMap[vidString] = &filer_pb.Locations{ Locations: locs, } } @@ -176,11 +176,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr if err = fs.filer.UpdateEntry(newEntry); err == nil { for _, garbage := range unusedChunks { glog.V(0).Infof("deleting %s old chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) - operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId)) + fs.filer.DeleteFileByFileId(garbage.FileId) } for _, garbage := range garbages { glog.V(0).Infof("deleting %s garbage chunk: %v, [%d, %d)", fullpath, garbage.FileId, garbage.Offset, garbage.Offset+int64(garbage.Size)) - operation.DeleteFile(fs.filer.GetMaster(), garbage.FileId, fs.jwt(garbage.FileId)) + fs.filer.DeleteFileByFileId(garbage.FileId) } } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 4e20be5da..e17cd776d 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -8,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/util" "mime" "mime/multipart" @@ -63,7 +62,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, fileId := entry.Chunks[0].FileId - urlString, err := operation.LookupFileId(fs.filer.GetMaster(), fileId) + urlString, err := fs.filer.MasterClient.LookupFileId(fileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) w.WriteHeader(http.StatusNotFound) @@ -223,7 +222,7 @@ func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int for _, chunkView := range chunkViews { - urlString, err := operation.LookupFileId(fs.filer.GetMaster(), chunkView.FileId) + urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index c1b7e3826..8a19f3fdb 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -49,7 +49,7 @@ func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Reques w.WriteHeader(http.StatusNoContent) } else { fileId = entry.Chunks[0].FileId - urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId) + urlLocation, err = fs.filer.MasterClient.LookupFileId(fileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err is %s", fileId, err.Error()) w.WriteHeader(http.StatusNotFound) @@ -176,7 +176,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if ret.Name != "" { path += ret.Name } else { - operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up + fs.filer.DeleteFileByFileId(fileId) glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") writeJsonError(w, r, http.StatusInternalServerError, errors.New("Can not to write to folder "+path+" without a file name")) @@ -205,7 +205,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { }}, } if db_err := fs.filer.CreateEntry(entry); db_err != nil { - operation.DeleteFile(fs.filer.GetMaster(), fileId, fs.jwt(fileId)) //clean up + fs.filer.DeleteFileByFileId(fileId) glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) writeJsonError(w, r, http.StatusInternalServerError, db_err) return diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index f24cea619..2952a8071 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -1,9 +1,11 @@ package weed_server import ( + "fmt" "net" "strings" + "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/topology" @@ -16,8 +18,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ defer func() { if dn != nil { + glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) t.UnRegisterDataNode(dn) + + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range dn.GetVolumes() { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } + + if len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() + } + } }() @@ -49,7 +69,26 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } - t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + + message := &master_pb.VolumeLocation{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + } + for _, v := range newVolumes { + message.NewVids = append(message.NewVids, uint32(v.Id)) + } + for _, v := range deletedVolumes { + message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) + } + + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() + } } else { return err @@ -67,15 +106,76 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } -// KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up. +// KeepConnected keep a stream gRPC call to the master. Used by clients to know the master is up. +// And clients gets the up-to-date list of volume locations func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error { - for { - _, err := stream.Recv() - if err != nil { + + req, err := stream.Recv() + if err != nil { + return err + } + + if !ms.Topo.IsLeader() { + return raft.NotLeaderError + } + + // remember client address + ctx := stream.Context() + // fmt.Printf("FromContext %+v\n", ctx) + pr, ok := peer.FromContext(ctx) + if !ok { + glog.Error("failed to get peer from ctx") + return fmt.Errorf("failed to get peer from ctx") + } + if pr.Addr == net.Addr(nil) { + glog.Error("failed to get peer address") + return fmt.Errorf("failed to get peer address") + } + + clientName := req.Name + pr.Addr.String() + glog.V(0).Infof("+ client %v", clientName) + + messageChan := make(chan *master_pb.VolumeLocation) + stopChan := make(chan bool) + + ms.clientChansLock.Lock() + ms.clientChans[clientName] = messageChan + ms.clientChansLock.Unlock() + + defer func() { + glog.V(0).Infof("- client %v", clientName) + ms.clientChansLock.Lock() + delete(ms.clientChans, clientName) + ms.clientChansLock.Unlock() + }() + + for _, message := range ms.Topo.ToVolumeLocations() { + if err := stream.Send(message); err != nil { return err } - if err := stream.Send(&master_pb.Empty{}); err != nil { - return err + } + + go func() { + for { + _, err := stream.Recv() + if err != nil { + glog.V(2).Infof("- client %v: %v", clientName, err) + stopChan <- true + break + } + } + }() + + for { + select { + case message := <-messageChan: + if err := stream.Send(message); err != nil { + return err + } + case <-stopChan: + return nil } } + + return nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index efa0ae104..07a398ead 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/topology" @@ -31,6 +32,10 @@ type MasterServer struct { vgLock sync.Mutex bounedLeaderChan chan int + + // notifying clients + clientChansLock sync.RWMutex + clientChans map[string]chan *master_pb.VolumeLocation } func NewMasterServer(r *mux.Router, port int, metaFolder string, @@ -54,6 +59,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, pulseSeconds: pulseSeconds, defaultReplicaPlacement: defaultReplicaPlacement, garbageThreshold: garbageThreshold, + clientChans: make(map[string]chan *master_pb.VolumeLocation), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() |
