aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/repeated_vacuum/repeated_vacuum.go8
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go3
-rw-r--r--weed/command/backup.go4
-rw-r--r--weed/command/benchmark.go8
-rw-r--r--weed/command/filer.go7
-rw-r--r--weed/command/filer_copy.go18
-rw-r--r--weed/command/filer_replication.go8
-rw-r--r--weed/command/master.go14
-rw-r--r--weed/command/mount_std.go3
-rw-r--r--weed/command/s3.go9
-rw-r--r--weed/command/scaffold.go23
-rw-r--r--weed/command/server.go10
-rw-r--r--weed/command/shell.go4
-rw-r--r--weed/command/upload.go9
-rw-r--r--weed/command/volume.go7
-rw-r--r--weed/command/webdav.go2
-rw-r--r--weed/filer2/stream.go41
-rw-r--r--weed/server/filer_server.go33
-rw-r--r--weed/server/filer_server_handlers_read.go34
-rw-r--r--weed/server/master_grpc_server.go2
-rw-r--r--weed/server/master_server.go67
-rw-r--r--weed/shell/command_collection_delete.go6
-rw-r--r--weed/shell/command_collection_list.go8
-rw-r--r--weed/shell/command_ec_balance.go16
-rw-r--r--weed/shell/command_ec_common.go6
-rw-r--r--weed/shell/command_ec_encode.go14
-rw-r--r--weed/shell/command_ec_rebuild.go10
-rw-r--r--weed/shell/command_fs_cat.go7
-rw-r--r--weed/shell/command_fs_cd.go4
-rw-r--r--weed/shell/command_fs_du.go6
-rw-r--r--weed/shell/command_fs_ls.go4
-rw-r--r--weed/shell/command_fs_meta_load.go4
-rw-r--r--weed/shell/command_fs_meta_notify.go8
-rw-r--r--weed/shell/command_fs_meta_save.go4
-rw-r--r--weed/shell/command_fs_pwd.go4
-rw-r--r--weed/shell/command_fs_tree.go4
-rw-r--r--weed/shell/command_volume_balance.go12
-rw-r--r--weed/shell/command_volume_copy.go4
-rw-r--r--weed/shell/command_volume_delete.go4
-rw-r--r--weed/shell/command_volume_fix_replication.go6
-rw-r--r--weed/shell/command_volume_list.go6
-rw-r--r--weed/shell/command_volume_mount.go4
-rw-r--r--weed/shell/command_volume_move.go4
-rw-r--r--weed/shell/command_volume_unmount.go4
-rw-r--r--weed/shell/commands.go33
-rw-r--r--weed/shell/shell_liner.go32
-rw-r--r--weed/util/config.go34
47 files changed, 337 insertions, 225 deletions
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 90bdeb5e8..28bcabb9b 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -4,12 +4,12 @@ import (
"bytes"
"flag"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/spf13/viper"
"log"
"math/rand"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -22,7 +22,7 @@ var (
func main() {
flag.Parse()
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
for i := 0; i < *repeat; i++ {
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
index 408f8dfec..f0ef51c09 100644
--- a/unmaintained/volume_tailer/volume_tailer.go
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
util2 "github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
@@ -25,7 +24,7 @@ var (
func main() {
flag.Parse()
- weed_server.LoadConfiguration("security", false)
+ util2.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
vid := needle.VolumeId(*volumeId)
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 022e784c7..31e146965 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -4,8 +4,8 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -52,7 +52,7 @@ var cmdBackup = &Command{
func runBackup(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
if *s.volumeId == -1 {
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 6c64c4591..dd0fdb88e 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -4,9 +4,6 @@ import (
"bufio"
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
"io"
"math"
"math/rand"
@@ -18,6 +15,9 @@ import (
"sync"
"time"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -108,7 +108,7 @@ var (
func runBenchmark(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
diff --git a/weed/command/filer.go b/weed/command/filer.go
index d82781765..2aa022cd0 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -1,13 +1,14 @@
package command
import (
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
"net/http"
"strconv"
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -77,7 +78,7 @@ var cmdFiler = &Command{
func runFiler(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
f.startFiler()
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index c18b9f055..8d8fead62 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -3,14 +3,6 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
"io"
"io/ioutil"
"net/http"
@@ -21,6 +13,14 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
)
var (
@@ -74,7 +74,7 @@ var cmdCopy = &Command{
func runCopy(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
if len(args) <= 1 {
return false
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 82576afe6..c6e7f5dba 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -13,7 +13,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
- "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
)
@@ -36,9 +36,9 @@ var cmdFilerReplicate = &Command{
func runFilerReplicate(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
- weed_server.LoadConfiguration("replication", true)
- weed_server.LoadConfiguration("notification", true)
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("replication", true)
+ util.LoadConfiguration("notification", true)
config := viper.GetViper()
var notificationInput sub.NotificationInput
diff --git a/weed/command/master.go b/weed/command/master.go
index 5a5bf2c0a..bda8493ed 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,6 +1,12 @@
package command
import (
+ "net/http"
+ "os"
+ "runtime"
+ "strconv"
+ "strings"
+
"github.com/chrislusf/raft/protobuf"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -10,11 +16,6 @@ import (
"github.com/gorilla/mux"
"github.com/spf13/viper"
"google.golang.org/grpc/reflection"
- "net/http"
- "os"
- "runtime"
- "strconv"
- "strings"
)
func init() {
@@ -56,7 +57,8 @@ var (
func runMaster(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("master", false)
if *mMaxCpu < 1 {
*mMaxCpu = runtime.NumCPU()
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 36c1f97a3..1d1214266 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -13,7 +13,6 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
"github.com/jacobsa/daemonize"
"github.com/spf13/viper"
@@ -45,7 +44,7 @@ func runMount(cmd *Command, args []string) bool {
func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
allowOthers bool, ttlSec int, dirListingLimit int) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if dir == "" {
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 014342766..e004bb066 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -1,13 +1,14 @@
package command
import (
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/spf13/viper"
"net/http"
"time"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+
"fmt"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -47,7 +48,7 @@ var cmdS3 = &Command{
func runS3(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
return s3StandaloneOptions.startS3Server()
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index e24d7b56a..2282658bb 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -10,7 +10,7 @@ func init() {
}
var cmdScaffold = &Command{
- UsageLine: "scaffold -config=[filer|notification|replication|security]",
+ UsageLine: "scaffold -config=[filer|notification|replication|security|master]",
Short: "generate basic configuration files",
Long: `Generate filer.toml with all possible configurations for you to customize.
@@ -19,7 +19,7 @@ var cmdScaffold = &Command{
var (
outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory")
- config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security] the configuration file to generate")
+ config = cmdScaffold.Flag.String("config", "filer", "[filer|notification|replication|security|master] the configuration file to generate")
)
func runScaffold(cmd *Command, args []string) bool {
@@ -34,6 +34,8 @@ func runScaffold(cmd *Command, args []string) bool {
content = REPLICATION_TOML_EXAMPLE
case "security":
content = SECURITY_TOML_EXAMPLE
+ case "master":
+ content = MASTER_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@@ -310,4 +312,21 @@ key = ""
`
+
+ MASTER_TOML_EXAMPLE = `
+# Put this file to one of the location, with descending priority
+# ./master.toml
+# $HOME/.seaweedfs/master.toml
+# /etc/seaweedfs/master.toml
+# this file is read by master
+
+[master.maintenance]
+scripts = """
+ ec.encode -fullPercent=95 -quietFor=1h
+ ec.rebuild -force
+ ec.balance -force
+ volume.balance -force
+"""
+
+`
)
diff --git a/weed/command/server.go b/weed/command/server.go
index 630ba72a7..437b0ad83 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,9 +2,6 @@ package command
import (
"fmt"
- "github.com/chrislusf/raft/protobuf"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
"net/http"
"os"
"runtime"
@@ -14,6 +11,10 @@ import (
"sync"
"time"
+ "github.com/chrislusf/raft/protobuf"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -107,7 +108,8 @@ func init() {
func runServer(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
+ util.LoadConfiguration("master", false)
if *serverOptions.cpuprofile != "" {
f, err := os.Create(*serverOptions.cpuprofile)
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 95b62f0b5..79f8b8bf9 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -2,8 +2,8 @@ package command
import (
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/shell"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
)
@@ -28,7 +28,7 @@ var ()
func runShell(command *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
shellOptions.FilerHost = "localhost"
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 1271725ba..25e938d9b 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -3,12 +3,13 @@ package command
import (
"encoding/json"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/spf13/viper"
"os"
"path/filepath"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/spf13/viper"
+
"github.com/chrislusf/seaweedfs/weed/operation"
)
@@ -61,7 +62,7 @@ var cmdUpload = &Command{
func runUpload(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
if len(args) == 0 {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 4e350b08d..c775ac5cf 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,8 +1,6 @@
package command
import (
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
"net/http"
"os"
"runtime"
@@ -11,6 +9,9 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
@@ -83,7 +84,7 @@ var (
func runVolume(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
if *v.maxCpu < 1 {
*v.maxCpu = runtime.NumCPU()
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 125893dfa..371c4a9ad 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -45,7 +45,7 @@ var cmdWebDav = &Command{
func runWebDav(cmd *Command, args []string) bool {
- weed_server.LoadConfiguration("security", false)
+ util.LoadConfiguration("security", false)
glog.V(0).Infof("Starting Seaweed WebDav Server %s at https port %d", util.VERSION, *webDavStandaloneOptions.port)
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
new file mode 100644
index 000000000..01b87cad1
--- /dev/null
+++ b/weed/filer2/stream.go
@@ -0,0 +1,41 @@
+package filer2
+
+import (
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error {
+
+ chunkViews := ViewFromChunks(chunks, offset, size)
+
+ fileId2Url := make(map[string]string)
+
+ for _, chunkView := range chunkViews {
+
+ urlString, err := masterClient.LookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ fileId2Url[chunkView.FileId] = urlString
+ }
+
+ for _, chunkView := range chunkViews {
+ urlString := fileId2Url[chunkView.FileId]
+ _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ w.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ }
+
+ return nil
+
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 40be2d7cf..5d2a54f4d 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -4,6 +4,7 @@ import (
"net/http"
"os"
+ "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2"
@@ -60,7 +61,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go fs.filer.KeepConnectedToMaster()
v := viper.GetViper()
- if !LoadConfiguration("filer", false) {
+ if !util.LoadConfiguration("filer", false) {
v.Set("leveldb.enabled", true)
v.Set("leveldb.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
@@ -68,7 +69,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
}
- LoadConfiguration("notification", false)
+ util.LoadConfiguration("notification", false)
fs.filer.LoadConfiguration(v)
@@ -85,31 +86,3 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
return fs, nil
}
-func LoadConfiguration(configFileName string, required bool) (loaded bool) {
-
- // find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
-
- glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
-
- if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
- if required {
- glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a filer.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
- " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+
- "\nOr use this command to generate the default toml file\n"+
- " weed scaffold -config=%s -output=.\n\n\n",
- configFileName, configFileName, configFileName)
- } else {
- return false
- }
- }
-
- return true
-
-}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index ad057f1d3..8477c3783 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -14,9 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
)
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
@@ -233,37 +231,7 @@ func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Reque
func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error {
- return StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size)
+ return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size)
}
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int) error {
-
- chunkViews := filer2.ViewFromChunks(chunks, offset, size)
-
- fileId2Url := make(map[string]string)
-
- for _, chunkView := range chunkViews {
-
- urlString, err := masterClient.LookupFileId(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- fileId2Url[chunkView.FileId] = urlString
- }
-
- for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- }
-
- return nil
-
-}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 506b9782d..0e41a7baa 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -125,7 +125,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.EcShards) > 0 {
- glog.V(0).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
// broadcast the ec vid changes to master clients
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index e78bd58dc..9ba9546fd 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,11 +2,17 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/shell"
"google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
+ "time"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -99,6 +105,8 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate)
+ ms.startAdminScripts()
+
return ms
}
@@ -153,3 +161,62 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
}
}
}
+
+func (ms *MasterServer) startAdminScripts() {
+ v := viper.GetViper()
+ adminScripts := v.GetString("master.maintenance.scripts")
+
+ glog.V(0).Infof("adminScripts:\n%v", adminScripts)
+ if adminScripts == "" {
+ return
+ }
+
+ scriptLines := strings.Split(adminScripts, "\n")
+
+ masterAddress := "localhost:" + strconv.Itoa(ms.port)
+
+ var shellOptions shell.ShellOptions
+ shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
+ shellOptions.Masters = &masterAddress
+ shellOptions.FilerHost = "localhost"
+ shellOptions.FilerPort = 8888
+ shellOptions.Directory = "/"
+
+ commandEnv := shell.NewCommandEnv(shellOptions)
+
+
+ reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
+
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+
+ go func() {
+ commandEnv.MasterClient.WaitUntilConnected()
+
+ c := time.Tick(17 * time.Second)
+ for _ = range c {
+ if ms.Topo.IsLeader() {
+ for _, line := range scriptLines {
+
+ cmds := reg.FindAllString(line, -1)
+ if len(cmds) == 0 {
+ continue
+ }
+ args := make([]string, len(cmds[1:]))
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+ cmd := strings.ToLower(cmds[0])
+
+ for _, c := range shell.Commands {
+ if c.Name() == cmd {
+ glog.V(0).Infof("executing: %s %v", cmd, args)
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ glog.V(0).Infof("error: %v", err)
+ }
+ }
+ }
+ }
+ }
+ }
+ }()
+}
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index e3356aba4..fbaddcd51 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -8,7 +8,7 @@ import (
)
func init() {
- commands = append(commands, &commandCollectionDelete{})
+ Commands = append(Commands, &commandCollectionDelete{})
}
type commandCollectionDelete struct {
@@ -26,7 +26,7 @@ func (c *commandCollectionDelete) Help() string {
`
}
-func (c *commandCollectionDelete) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) == 0 {
return nil
@@ -35,7 +35,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *commandEnv, writ
collectionName := args[0]
ctx := context.Background()
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index d1124adde..c4325c66f 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -8,7 +8,7 @@ import (
)
func init() {
- commands = append(commands, &commandCollectionList{})
+ Commands = append(Commands, &commandCollectionList{})
}
type commandCollectionList struct {
@@ -22,7 +22,7 @@ func (c *commandCollectionList) Help() string {
return `list all collections`
}
-func (c *commandCollectionList) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
collections, err := ListCollectionNames(commandEnv, true, true)
@@ -39,10 +39,10 @@ func (c *commandCollectionList) Do(args []string, commandEnv *commandEnv, writer
return nil
}
-func ListCollectionNames(commandEnv *commandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
+func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
ctx := context.Background()
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.CollectionList(ctx, &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 6d44a184b..424b63d9d 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -13,7 +13,7 @@ import (
)
func init() {
- commands = append(commands, &commandEcBalance{})
+ Commands = append(Commands, &commandEcBalance{})
}
type commandEcBalance struct {
@@ -53,7 +53,7 @@ func (c *commandEcBalance) Help() string {
`
}
-func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
@@ -65,7 +65,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.W
var resp *master_pb.VolumeListResponse
ctx := context.Background()
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
@@ -104,7 +104,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *commandEnv, writer io.W
return nil
}
-func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing bool) error {
+func balanceEcVolumes(commandEnv *CommandEnv, collection string, applyBalancing bool) error {
ctx := context.Background()
@@ -142,7 +142,7 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
return nil
}
-func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error {
+func doBalanceEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error {
// collect all ec nodes with at least one free slot
var possibleDestinationEcNodes []*EcNode
for _, ecNode := range allEcNodes {
@@ -171,7 +171,7 @@ func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection s
return nil
}
-func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
+func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
@@ -205,7 +205,7 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collecti
return nil
}
-func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
for _, ecNode := range existingLocations {
@@ -232,7 +232,7 @@ func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv,
return nil
}
-func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
sortEcNodes(possibleDestinationEcNodes)
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index b5560b560..4c53ba43b 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -14,7 +14,7 @@ import (
"google.golang.org/grpc"
)
-func moveMountedShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
+func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
@@ -124,11 +124,11 @@ type EcNode struct {
freeEcSlot int
}
-func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
var resp *master_pb.VolumeListResponse
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index e0d4cf380..8b01f6cfc 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -18,7 +18,7 @@ import (
)
func init() {
- commands = append(commands, &commandEcEncode{})
+ Commands = append(Commands, &commandEcEncode{})
}
type commandEcEncode struct {
@@ -51,7 +51,7 @@ func (c *commandEcEncode) Help() string {
`
}
-func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
@@ -85,9 +85,9 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr
return nil
}
-func doEcEncode(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId) (err error) {
+func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
- locations := commandEnv.masterClient.GetLocations(uint32(vid))
+ locations := commandEnv.MasterClient.GetLocations(uint32(vid))
if len(locations) == 0 {
return fmt.Errorf("volume %d not found", vid)
}
@@ -121,7 +121,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum
}
-func spreadEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
+func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv)
if err != nil {
@@ -228,10 +228,10 @@ func balancedEcDistribution(servers []*EcNode) (allocated []int) {
return allocated
}
-func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 4b82f96c0..9a5ea3ca9 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -14,7 +14,7 @@ import (
)
func init() {
- commands = append(commands, &commandEcRebuild{})
+ Commands = append(Commands, &commandEcRebuild{})
}
type commandEcRebuild struct {
@@ -54,7 +54,7 @@ func (c *commandEcRebuild) Help() string {
`
}
-func (c *commandEcRebuild) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
@@ -90,7 +90,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *commandEnv, writer io.W
return nil
}
-func rebuildEcVolumes(commandEnv *commandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error {
+func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error {
ctx := context.Background()
@@ -125,7 +125,7 @@ func rebuildEcVolumes(commandEnv *commandEnv, allEcNodes []*EcNode, collection s
return nil
}
-func rebuildOneEcVolume(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
+func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
@@ -182,7 +182,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
return
}
-func prepareDataToRecover(ctx context.Context, commandEnv *commandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
+func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
needEcxFile := true
var localShardBits erasure_coding.ShardBits
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index a6c308d2a..66ced46c5 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -8,11 +8,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
)
func init() {
- commands = append(commands, &commandFsCat{})
+ Commands = append(Commands, &commandFsCat{})
}
type commandFsCat struct {
@@ -34,7 +33,7 @@ func (c *commandFsCat) Help() string {
`
}
-func (c *commandFsCat) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
input := findInputDirectory(args)
@@ -62,7 +61,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *commandEnv, writer io.Write
return err
}
- return weed_server.StreamContent(commandEnv.masterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32)
+ return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32)
})
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
index f14350f02..408ec86c8 100644
--- a/weed/shell/command_fs_cd.go
+++ b/weed/shell/command_fs_cd.go
@@ -6,7 +6,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsCd{})
+ Commands = append(Commands, &commandFsCd{})
}
type commandFsCd struct {
@@ -29,7 +29,7 @@ func (c *commandFsCd) Help() string {
`
}
-func (c *commandFsCd) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
input := findInputDirectory(args)
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index f305cabdc..5e634c82a 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -11,7 +11,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsDu{})
+ Commands = append(Commands, &commandFsDu{})
}
type commandFsDu struct {
@@ -30,7 +30,7 @@ func (c *commandFsDu) Help() string {
`
}
-func (c *commandFsDu) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
@@ -106,7 +106,7 @@ func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.Se
}
-func (env *commandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 5c5130c53..6979635e1 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -13,7 +13,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsLs{})
+ Commands = append(Commands, &commandFsLs{})
}
type commandFsLs struct {
@@ -35,7 +35,7 @@ func (c *commandFsLs) Help() string {
`
}
-func (c *commandFsLs) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var isLongFormat, showHidden bool
for _, arg := range args {
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 2c5d19015..5ea8de9f5 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -13,7 +13,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsMetaLoad{})
+ Commands = append(Commands, &commandFsMetaLoad{})
}
type commandFsMetaLoad struct {
@@ -31,7 +31,7 @@ func (c *commandFsMetaLoad) Help() string {
`
}
-func (c *commandFsMetaLoad) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) == 0 {
fmt.Fprintf(writer, "missing a metadata file\n")
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
index ca4d8da5b..13b272fbf 100644
--- a/weed/shell/command_fs_meta_notify.go
+++ b/weed/shell/command_fs_meta_notify.go
@@ -8,12 +8,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/spf13/viper"
)
func init() {
- commands = append(commands, &commandFsMetaNotify{})
+ Commands = append(Commands, &commandFsMetaNotify{})
}
type commandFsMetaNotify struct {
@@ -33,14 +33,14 @@ func (c *commandFsMetaNotify) Help() string {
`
}
-func (c *commandFsMetaNotify) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- weed_server.LoadConfiguration("notification", true)
+ util.LoadConfiguration("notification", true)
v := viper.GetViper()
notification.LoadConfiguration(v.Sub("notification"))
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index 47426a858..6ca395fae 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -14,7 +14,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsMetaSave{})
+ Commands = append(Commands, &commandFsMetaSave{})
}
type commandFsMetaSave struct {
@@ -40,7 +40,7 @@ func (c *commandFsMetaSave) Help() string {
`
}
-func (c *commandFsMetaSave) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go
index 0b0a7f176..084a5e90a 100644
--- a/weed/shell/command_fs_pwd.go
+++ b/weed/shell/command_fs_pwd.go
@@ -6,7 +6,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsPwd{})
+ Commands = append(Commands, &commandFsPwd{})
}
type commandFsPwd struct {
@@ -20,7 +20,7 @@ func (c *commandFsPwd) Help() string {
return `print out current directory`
}
-func (c *commandFsPwd) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fmt.Fprintf(writer, "http://%s:%d%s\n",
commandEnv.option.FilerHost,
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index f1ffc9e4b..8474e43ea 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -10,7 +10,7 @@ import (
)
func init() {
- commands = append(commands, &commandFsTree{})
+ Commands = append(Commands, &commandFsTree{})
}
type commandFsTree struct {
@@ -27,7 +27,7 @@ func (c *commandFsTree) Help() string {
`
}
-func (c *commandFsTree) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index 683e2dbe8..1ae031658 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -15,7 +15,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeBalance{})
+ Commands = append(Commands, &commandVolumeBalance{})
}
type commandVolumeBalance struct {
@@ -59,7 +59,7 @@ func (c *commandVolumeBalance) Help() string {
`
}
-func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
@@ -71,7 +71,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer
var resp *master_pb.VolumeListResponse
ctx := context.Background()
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
@@ -108,7 +108,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *commandEnv, writer
return nil
}
-func balanceVolumeServers(commandEnv *commandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
+func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
var nodes []*Node
for _, dn := range dataNodeInfos {
nodes = append(nodes, &Node{
@@ -181,7 +181,7 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
-func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error {
+func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error {
selectedVolumeCount := 0
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
@@ -223,7 +223,7 @@ func balanceSelectedVolume(commandEnv *commandEnv, nodes []*Node, sortCandidates
return nil
}
-func moveVolume(commandEnv *commandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {
+func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {
collectionPrefix := v.Collection + "_"
if v.Collection == "" {
collectionPrefix = ""
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 8f14e25b3..1c83ba655 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -9,7 +9,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeCopy{})
+ Commands = append(Commands, &commandVolumeCopy{})
}
type commandVolumeCopy struct {
@@ -30,7 +30,7 @@ func (c *commandVolumeCopy) Help() string {
`
}
-func (c *commandVolumeCopy) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 3 {
fmt.Fprintf(writer, "received args: %+v\n", args)
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
index 748c89eed..17d27ea3a 100644
--- a/weed/shell/command_volume_delete.go
+++ b/weed/shell/command_volume_delete.go
@@ -9,7 +9,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeDelete{})
+ Commands = append(Commands, &commandVolumeDelete{})
}
type commandVolumeDelete struct {
@@ -29,7 +29,7 @@ func (c *commandVolumeDelete) Help() string {
`
}
-func (c *commandVolumeDelete) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 2 {
fmt.Fprintf(writer, "received args: %+v\n", args)
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index cff4adf89..09e1c19eb 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -13,7 +13,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeFixReplication{})
+ Commands = append(Commands, &commandVolumeFixReplication{})
}
type commandVolumeFixReplication struct {
@@ -41,7 +41,7 @@ func (c *commandVolumeFixReplication) Help() string {
`
}
-func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
takeAction := true
if len(args) > 0 && args[0] == "-n" {
@@ -50,7 +50,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv,
var resp *master_pb.VolumeListResponse
ctx := context.Background()
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index a402c1e51..134580ffe 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -11,7 +11,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeList{})
+ Commands = append(Commands, &commandVolumeList{})
}
type commandVolumeList struct {
@@ -29,11 +29,11 @@ func (c *commandVolumeList) Help() string {
`
}
-func (c *commandVolumeList) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var resp *master_pb.VolumeListResponse
ctx := context.Background()
- err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 37dd7765f..50a307492 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -12,7 +12,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeMount{})
+ Commands = append(Commands, &commandVolumeMount{})
}
type commandVolumeMount struct {
@@ -32,7 +32,7 @@ func (c *commandVolumeMount) Help() string {
`
}
-func (c *commandVolumeMount) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 2 {
fmt.Fprintf(writer, "received args: %+v\n", args)
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 03b1446e6..08d87c988 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -14,7 +14,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeMove{})
+ Commands = append(Commands, &commandVolumeMove{})
}
type commandVolumeMove struct {
@@ -42,7 +42,7 @@ func (c *commandVolumeMove) Help() string {
`
}
-func (c *commandVolumeMove) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 3 {
fmt.Fprintf(writer, "received args: %+v\n", args)
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index ce9f666af..8096f34d8 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -12,7 +12,7 @@ import (
)
func init() {
- commands = append(commands, &commandVolumeUnmount{})
+ Commands = append(Commands, &commandVolumeUnmount{})
}
type commandVolumeUnmount struct {
@@ -32,7 +32,7 @@ func (c *commandVolumeUnmount) Help() string {
`
}
-func (c *commandVolumeUnmount) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
if len(args) != 2 {
fmt.Fprintf(writer, "received args: %+v\n", args)
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 6fade6e9d..ade8acd89 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -3,15 +3,16 @@ package shell
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
- "google.golang.org/grpc"
"io"
"net/url"
"path/filepath"
"strconv"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "google.golang.org/grpc"
)
type ShellOptions struct {
@@ -23,23 +24,33 @@ type ShellOptions struct {
Directory string
}
-type commandEnv struct {
+type CommandEnv struct {
env map[string]string
- masterClient *wdclient.MasterClient
+ MasterClient *wdclient.MasterClient
option ShellOptions
}
type command interface {
Name() string
Help() string
- Do([]string, *commandEnv, io.Writer) error
+ Do([]string, *CommandEnv, io.Writer) error
}
var (
- commands = []command{}
+ Commands = []command{}
)
-func (ce *commandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) {
+
+func NewCommandEnv(options ShellOptions) *CommandEnv {
+ return &CommandEnv{
+ env: make(map[string]string),
+ MasterClient: wdclient.NewMasterClient(context.Background(),
+ options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")),
+ option: options,
+ }
+}
+
+func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) {
if strings.HasPrefix(input, "http") {
return parseFilerUrl(input)
}
@@ -49,13 +60,13 @@ func (ce *commandEnv) parseUrl(input string) (filerServer string, filerPort int6
return ce.option.FilerHost, ce.option.FilerPort, input, err
}
-func (ce *commandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
+func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil
}
-func (ce *commandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
+func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
dir, name := filer2.FullPath(path).DirAndName()
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 2c3058cbf..a4f17e0fa 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -1,17 +1,16 @@
package shell
import (
- "context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"os"
"path"
"regexp"
"strings"
- "github.com/peterh/liner"
"sort"
+
+ "github.com/peterh/liner"
)
var (
@@ -33,15 +32,10 @@ func RunShell(options ShellOptions) {
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
- commandEnv := &commandEnv{
- env: make(map[string]string),
- masterClient: wdclient.NewMasterClient(context.Background(),
- options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")),
- option: options,
- }
+ commandEnv := NewCommandEnv(options)
- go commandEnv.masterClient.KeepConnectedToMaster()
- commandEnv.masterClient.WaitUntilConnected()
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+ commandEnv.MasterClient.WaitUntilConnected()
for {
cmd, err := line.Prompt("> ")
@@ -71,7 +65,7 @@ func RunShell(options ShellOptions) {
return
} else {
foundCommand := false
- for _, c := range commands {
+ for _, c := range Commands {
if c.Name() == cmd {
if err := c.Do(args, commandEnv, os.Stdout); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
@@ -94,10 +88,10 @@ func printGenericHelp() {
`
fmt.Print(msg)
- sort.Slice(commands, func(i, j int) bool {
- return strings.Compare(commands[i].Name(), commands[j].Name()) < 0
+ sort.Slice(Commands, func(i, j int) bool {
+ return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0
})
- for _, c := range commands {
+ for _, c := range Commands {
helpTexts := strings.SplitN(c.Help(), "\n", 2)
fmt.Printf(" %-30s\t# %s \n", c.Name(), helpTexts[0])
}
@@ -112,11 +106,11 @@ func printHelp(cmds []string) {
} else {
cmd := strings.ToLower(args[0])
- sort.Slice(commands, func(i, j int) bool {
- return strings.Compare(commands[i].Name(), commands[j].Name()) < 0
+ sort.Slice(Commands, func(i, j int) bool {
+ return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0
})
- for _, c := range commands {
+ for _, c := range Commands {
if c.Name() == cmd {
fmt.Printf(" %s\t# %s\n", c.Name(), c.Help())
}
@@ -126,7 +120,7 @@ func printHelp(cmds []string) {
func setCompletionHandler() {
line.SetCompleter(func(line string) (c []string) {
- for _, i := range commands {
+ for _, i := range Commands {
if strings.HasPrefix(i.Name(), strings.ToLower(line)) {
c = append(c, i.Name())
}
diff --git a/weed/util/config.go b/weed/util/config.go
index 77cab3019..1ea833d1f 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -1,5 +1,10 @@
package util
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+)
+
type Configuration interface {
GetString(key string) string
GetBool(key string) bool
@@ -8,3 +13,32 @@ type Configuration interface {
GetFloat64(key string) float64
GetStringSlice(key string) []string
}
+
+func LoadConfiguration(configFileName string, required bool) (loaded bool) {
+
+ // find a filer store
+ viper.SetConfigName(configFileName) // name of config file (without extension)
+ viper.AddConfigPath(".") // optionally look for config in the working directory
+ viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
+
+ glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
+
+ if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
+ glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
+ if required {
+ glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
+ "\n\nPlease follow this example and add a filer.toml file to "+
+ "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
+ " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+
+ "\nOr use this command to generate the default toml file\n"+
+ " weed scaffold -config=%s -output=.\n\n\n",
+ configFileName, configFileName, configFileName)
+ } else {
+ return false
+ }
+ }
+
+ return true
+
+}