diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_server.go | 33 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 34 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server.go | 67 |
4 files changed, 72 insertions, 64 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 40be2d7cf..5d2a54f4d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -4,6 +4,7 @@ import ( "net/http" "os" + "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -60,7 +61,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go fs.filer.KeepConnectedToMaster() v := viper.GetViper() - if !LoadConfiguration("filer", false) { + if !util.LoadConfiguration("filer", false) { v.Set("leveldb.enabled", true) v.Set("leveldb.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) @@ -68,7 +69,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) os.MkdirAll(option.DefaultLevelDbDir, 0755) } } - LoadConfiguration("notification", false) + util.LoadConfiguration("notification", false) fs.filer.LoadConfiguration(v) @@ -85,31 +86,3 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) return fs, nil } -func LoadConfiguration(configFileName string, required bool) (loaded bool) { - - // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in - - glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) - - if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) - if required { - glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a filer.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ - " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n\n\n", - configFileName, configFileName, configFileName) - } else { - return false - } - } - - return true - -} diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index ad057f1d3..8477c3783 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -14,9 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/wdclient" ) func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { @@ -233,37 +231,7 @@ func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Reque func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error { - return StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size) + return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size) } -func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error { - - chunkViews := filer2.ViewFromChunks(chunks, offset, size) - - fileId2Url := make(map[string]string) - - for _, chunkView := range chunkViews { - - urlString, err := masterClient.LookupFileId(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err - } - fileId2Url[chunkView.FileId] = urlString - } - - for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err - } - } - - return nil - -} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 506b9782d..0e41a7baa 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -125,7 +125,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.EcShards) > 0 { - glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) + glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e78bd58dc..9ba9546fd 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,11 +2,17 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/shell" "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" + "os" + "regexp" + "strconv" + "strings" "sync" + "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" @@ -99,6 +105,8 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate) + ms.startAdminScripts() + return ms } @@ -153,3 +161,62 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ } } } + +func (ms *MasterServer) startAdminScripts() { + v := viper.GetViper() + adminScripts := v.GetString("master.maintenance.scripts") + + glog.V(0).Infof("adminScripts:\n%v", adminScripts) + if adminScripts == "" { + return + } + + scriptLines := strings.Split(adminScripts, "\n") + + masterAddress := "localhost:" + strconv.Itoa(ms.port) + + var shellOptions shell.ShellOptions + shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") + shellOptions.Masters = &masterAddress + shellOptions.FilerHost = "localhost" + shellOptions.FilerPort = 8888 + shellOptions.Directory = "/" + + commandEnv := shell.NewCommandEnv(shellOptions) + + + reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) + + go commandEnv.MasterClient.KeepConnectedToMaster() + + go func() { + commandEnv.MasterClient.WaitUntilConnected() + + c := time.Tick(17 * time.Second) + for _ = range c { + if ms.Topo.IsLeader() { + for _, line := range scriptLines { + + cmds := reg.FindAllString(line, -1) + if len(cmds) == 0 { + continue + } + args := make([]string, len(cmds[1:])) + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + cmd := strings.ToLower(cmds[0]) + + for _, c := range shell.Commands { + if c.Name() == cmd { + glog.V(0).Infof("executing: %s %v", cmd, args) + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + glog.V(0).Infof("error: %v", err) + } + } + } + } + } + } + }() +} |
