aboutsummaryrefslogtreecommitdiff
path: root/go/weed
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2015-01-10 02:51:26 +0800
committeryourchanges <yourchanges@gmail.com>2015-01-10 02:51:26 +0800
commit9601880e323bbdf9540f2c79fb21d66374245b50 (patch)
tree14fd3b36a89955ec6e0be6d51186031e978b519d /go/weed
parentf7bcd8e958ef185baeca0c455a397d49fcb62256 (diff)
parent2c1a846279c172bcae457e70efa142c29a18892e (diff)
downloadseaweedfs-9601880e323bbdf9540f2c79fb21d66374245b50.tar.xz
seaweedfs-9601880e323bbdf9540f2c79fb21d66374245b50.zip
Merge pull request #2 from chrislusf/master
merge
Diffstat (limited to 'go/weed')
-rw-r--r--go/weed/benchmark.go277
-rw-r--r--go/weed/compact.go2
-rw-r--r--go/weed/download.go5
-rw-r--r--go/weed/export.go7
-rw-r--r--go/weed/filer.go32
-rw-r--r--go/weed/fix.go7
-rw-r--r--go/weed/master.go16
-rw-r--r--go/weed/mount.go2
-rw-r--r--go/weed/mount_std.go7
-rw-r--r--go/weed/server.go43
-rw-r--r--go/weed/shell.go3
-rw-r--r--go/weed/signal_handling_notsupported.go2
-rw-r--r--go/weed/upload.go3
-rw-r--r--go/weed/version.go3
-rw-r--r--go/weed/volume.go16
-rw-r--r--go/weed/volume_test.go3
-rw-r--r--go/weed/weed.go3
-rw-r--r--go/weed/weed_server/common.go66
-rw-r--r--go/weed/weed_server/filer_server.go53
-rw-r--r--go/weed/weed_server/filer_server_handlers.go49
-rw-r--r--go/weed/weed_server/filer_server_handlers_admin.go5
-rw-r--r--go/weed/weed_server/master_server.go45
-rw-r--r--go/weed/weed_server/master_server_handlers.go33
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go44
-rw-r--r--go/weed/weed_server/raft_server.go56
-rw-r--r--go/weed/weed_server/raft_server_handlers.go9
-rw-r--r--go/weed/weed_server/volume_server.go36
-rw-r--r--go/weed/weed_server/volume_server_handlers.go50
-rw-r--r--go/weed/weed_server/volume_server_handlers_admin.go21
-rw-r--r--go/weed/weed_server/volume_server_handlers_vacuum.go15
30 files changed, 508 insertions, 405 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index fec8472e5..f4f0b1874 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -2,9 +2,6 @@ package main
import (
"bufio"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"math"
@@ -16,6 +13,10 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
)
type BenchmarkOptions struct {
@@ -30,11 +31,14 @@ type BenchmarkOptions struct {
sequentialRead *bool
collection *string
cpuprofile *string
+ maxCpu *int
vid2server map[string]string //cache for vid locations
+
}
var (
- b BenchmarkOptions
+ b BenchmarkOptions
+ sharedBytes []byte
)
func init() {
@@ -50,33 +54,35 @@ func init() {
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
- b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file")
+ b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
+ b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.vid2server = make(map[string]string)
+ sharedBytes = make([]byte, 1024)
}
var cmdBenchmark = &Command{
UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
Short: "benchmark on writing millions of files and read out",
Long: `benchmark on an empty weed file system.
-
+
Two tests during benchmark:
1) write lots of small files to the system
2) read the files out
-
+
The file content is mostly zero, but no compression is done.
-
+
You can choose to only benchmark read or write.
During write, the list of uploaded file ids is stored in "-list" specified file.
You can also use your own list of file ids to run read test.
-
+
Write speed and read speed will be collected.
The numbers are used to get a sense of the system.
Usually your network or the hard drive is the real bottleneck.
-
+
Another thing to watch is whether the volumes are evenly distributed
to each volume server. Because the 7 more benchmark volumes are randomly distributed
to servers with free slots, it's highly possible some servers have uneven amount of
- benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
+ benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
before starting the benchmark command:
http://localhost:9333/vol/grow?collection=benchmark&count=5
@@ -87,18 +93,17 @@ var cmdBenchmark = &Command{
}
var (
- wait sync.WaitGroup
- writeStats *stats
- readStats *stats
- serverLimitChan map[string]chan bool
+ wait sync.WaitGroup
+ writeStats *stats
+ readStats *stats
)
-func init() {
- serverLimitChan = make(map[string]chan bool)
-}
-
func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ if *b.maxCpu < 1 {
+ *b.maxCpu = runtime.NumCPU()
+ }
+ runtime.GOMAXPROCS(*b.maxCpu)
if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile)
if err != nil {
@@ -122,12 +127,12 @@ func runbenchmark(cmd *Command, args []string) bool {
func bench_write() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
- writeStats = newStats()
+ writeStats = newStats(*b.concurrency)
idChan := make(chan int)
- wait.Add(*b.concurrency)
go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
for i := 0; i < *b.concurrency; i++ {
- go writeFiles(idChan, fileIdLineChan, writeStats)
+ wait.Add(1)
+ go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
}
writeStats.start = time.Now()
writeStats.total = *b.numberOfFiles
@@ -138,28 +143,30 @@ func bench_write() {
close(idChan)
wait.Wait()
writeStats.end = time.Now()
- wait.Add(1)
+ wait.Add(2)
finishChan <- true
finishChan <- true
- close(finishChan)
wait.Wait()
+ close(finishChan)
writeStats.printStats()
}
func bench_read() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
- readStats = newStats()
- wait.Add(*b.concurrency)
+ readStats = newStats(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan)
readStats.start = time.Now()
readStats.total = *b.numberOfFiles
go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
for i := 0; i < *b.concurrency; i++ {
- go readFiles(fileIdLineChan, readStats)
+ wait.Add(1)
+ go readFiles(fileIdLineChan, &readStats.localStats[i])
}
wait.Wait()
+ wait.Add(1)
finishChan <- true
+ wait.Wait()
close(finishChan)
readStats.end = time.Now()
readStats.printStats()
@@ -170,126 +177,102 @@ type delayedFile struct {
fp *operation.FilePart
}
-func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
+func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
+ defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup
for i := 0; i < 7; i++ {
+ waitForDeletions.Add(1)
go func() {
- waitForDeletions.Add(1)
+ defer waitForDeletions.Done()
for df := range delayedDeleteChan {
- if df == nil {
- break
- }
if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now()))
}
- fp := df.fp
- serverLimitChan[fp.Server] <- true
- if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil {
+ if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
s.completed++
} else {
s.failed++
}
- <-serverLimitChan[fp.Server]
}
- waitForDeletions.Done()
}()
}
- for {
- if id, ok := <-idChan; ok {
- start := time.Now()
- fileSize := int64(*b.fileSize + rand.Intn(64))
- fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
- if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
- fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
- if _, ok := serverLimitChan[fp.Server]; !ok {
- serverLimitChan[fp.Server] = make(chan bool, 7)
- }
- serverLimitChan[fp.Server] <- true
- if _, err := fp.Upload(0, *b.server); err == nil {
- if rand.Intn(100) < *b.deletePercentage {
- s.total++
- delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
- } else {
- fileIdLineChan <- fp.Fid
- }
- s.completed++
- s.transferred += fileSize
+ for id := range idChan {
+ start := time.Now()
+ fileSize := int64(*b.fileSize + rand.Intn(64))
+ fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
+ if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
+ fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
+ if _, err := fp.Upload(0, *b.server); err == nil {
+ if rand.Intn(100) < *b.deletePercentage {
+ s.total++
+ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
} else {
- s.failed++
- }
- writeStats.addSample(time.Now().Sub(start))
- <-serverLimitChan[fp.Server]
- if *cmdBenchmark.IsDebug {
- fmt.Printf("writing %d file %s\n", id, fp.Fid)
+ fileIdLineChan <- fp.Fid
}
+ s.completed++
+ s.transferred += fileSize
} else {
s.failed++
- println("writing file error:", err.Error())
+ fmt.Printf("Failed to write with error:%v\n", err)
+ }
+ writeStats.addSample(time.Now().Sub(start))
+ if *cmdBenchmark.IsDebug {
+ fmt.Printf("writing %d file %s\n", id, fp.Fid)
}
} else {
- break
+ s.failed++
+ println("writing file error:", err.Error())
}
}
close(delayedDeleteChan)
waitForDeletions.Wait()
- wait.Done()
}
-func readFiles(fileIdLineChan chan string, s *stats) {
- serverLimitChan := make(map[string]chan bool)
+func readFiles(fileIdLineChan chan string, s *stat) {
+ defer wait.Done()
masterLimitChan := make(chan bool, 1)
- for {
- if fid, ok := <-fileIdLineChan; ok {
- if len(fid) == 0 {
- continue
- }
- if fid[0] == '#' {
- continue
- }
- if *cmdBenchmark.IsDebug {
- fmt.Printf("reading file %s\n", fid)
- }
- parts := strings.SplitN(fid, ",", 2)
- vid := parts[0]
- start := time.Now()
- if server, ok := b.vid2server[vid]; !ok {
- masterLimitChan <- true
- if _, now_ok := b.vid2server[vid]; !now_ok {
- if ret, err := operation.Lookup(*b.server, vid); err == nil {
- if len(ret.Locations) > 0 {
- server = ret.Locations[0].PublicUrl
- b.vid2server[vid] = server
- }
+ for fid := range fileIdLineChan {
+ if len(fid) == 0 {
+ continue
+ }
+ if fid[0] == '#' {
+ continue
+ }
+ if *cmdBenchmark.IsDebug {
+ fmt.Printf("reading file %s\n", fid)
+ }
+ parts := strings.SplitN(fid, ",", 2)
+ vid := parts[0]
+ start := time.Now()
+ if server, ok := b.vid2server[vid]; !ok {
+ masterLimitChan <- true
+ if _, now_ok := b.vid2server[vid]; !now_ok {
+ if ret, err := operation.Lookup(*b.server, vid); err == nil {
+ if len(ret.Locations) > 0 {
+ server = ret.Locations[0].PublicUrl
+ b.vid2server[vid] = server
}
}
- <-masterLimitChan
}
- if server, ok := b.vid2server[vid]; ok {
- if _, ok := serverLimitChan[server]; !ok {
- serverLimitChan[server] = make(chan bool, 7)
- }
- serverLimitChan[server] <- true
- url := "http://" + server + "/" + fid
- if bytesRead, err := util.Get(url); err == nil {
- s.completed++
- s.transferred += int64(len(bytesRead))
- readStats.addSample(time.Now().Sub(start))
- } else {
- s.failed++
- println("!!!! Failed to read from ", url, " !!!!!")
- }
- <-serverLimitChan[server]
+ <-masterLimitChan
+ }
+ if server, ok := b.vid2server[vid]; ok {
+ url := "http://" + server + "/" + fid
+ if bytesRead, err := util.Get(url); err == nil {
+ s.completed++
+ s.transferred += int64(len(bytesRead))
+ readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- println("!!!! volume id ", vid, " location not found!!!!!")
+ fmt.Printf("Failed to read %s error:%v\n", url, err)
}
} else {
- break
+ s.failed++
+ println("!!!! volume id ", vid, " location not found!!!!!")
}
}
- wait.Done()
}
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
@@ -353,20 +336,28 @@ const (
// An efficient statics collecting and rendering
type stats struct {
- data []int
- overflow []int
+ data []int
+ overflow []int
+ localStats []stat
+ start time.Time
+ end time.Time
+ total int
+}
+type stat struct {
completed int
failed int
total int
transferred int64
- start time.Time
- end time.Time
}
var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
-func newStats() *stats {
- return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)}
+func newStats(n int) *stats {
+ return &stats{
+ data: make([]int, benchResolution),
+ overflow: make([]int, 0),
+ localStats: make([]stat, n),
+ }
}
func (s *stats) addSample(d time.Duration) {
@@ -387,28 +378,41 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
for {
select {
case <-finishChan:
+ wait.Done()
return
case t := <-ticker:
- completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
+ completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
+ for _, localStat := range s.localStats {
+ completed += localStat.completed
+ transferred += localStat.transferred
+ total += localStat.total
+ }
fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
- s.completed, s.total, float64(s.completed)*100/float64(s.total),
- float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
- float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
+ completed, total, float64(completed)*100/float64(total),
+ float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
+ float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
)
- lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t
+ lastCompleted, lastTransferred, lastTime = completed, transferred, t
}
}
}
func (s *stats) printStats() {
+ completed, failed, transferred, total := 0, 0, int64(0), s.total
+ for _, localStat := range s.localStats {
+ completed += localStat.completed
+ failed += localStat.failed
+ transferred += localStat.transferred
+ total += localStat.total
+ }
timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
- fmt.Printf("Complete requests: %d\n", s.completed)
- fmt.Printf("Failed requests: %d\n", s.failed)
- fmt.Printf("Total transferred: %d bytes\n", s.transferred)
- fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
- fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
+ fmt.Printf("Complete requests: %d\n", completed)
+ fmt.Printf("Failed requests: %d\n", failed)
+ fmt.Printf("Total transferred: %d bytes\n", transferred)
+ fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
+ fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
n, sum := 0, 0
min, max := 10000000, 0
for i := 0; i < len(s.data); i++ {
@@ -496,15 +500,32 @@ func (l *FakeReader) Read(p []byte) (n int, err error) {
} else {
n = len(p)
}
- for i := 0; i < n-8; i += 8 {
- for s := uint(0); s < 8; s++ {
- p[i] = byte(l.id >> (s * 8))
+ if n >= 8 {
+ for i := 0; i < 8; i++ {
+ p[i] = byte(l.id >> uint(i*8))
}
}
l.size -= int64(n)
return
}
+func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
+ size := int(l.size)
+ bufferSize := len(sharedBytes)
+ for size > 0 {
+ tempBuffer := sharedBytes
+ if size < bufferSize {
+ tempBuffer = sharedBytes[0:size]
+ }
+ count, e := w.Write(tempBuffer)
+ if e != nil {
+ return int64(size), e
+ }
+ size -= count
+ }
+ return l.size, nil
+}
+
func Readln(r *bufio.Reader) ([]byte, error) {
var (
isPrefix bool = true
diff --git a/go/weed/compact.go b/go/weed/compact.go
index a99e6c93e..71c4ea90f 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -12,7 +12,7 @@ func init() {
var cmdCompact = &Command{
UsageLine: "compact -dir=/tmp -volumeId=234",
- Short: "run weed tool compact on volume file if corrupted",
+ Short: "run weed tool compact on volume file",
Long: `Force an compaction to remove deleted files from volume files.
The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file.
diff --git a/go/weed/download.go b/go/weed/download.go
index c30d17915..c782654f5 100644
--- a/go/weed/download.go
+++ b/go/weed/download.go
@@ -1,14 +1,15 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
)
var (
diff --git a/go/weed/export.go b/go/weed/export.go
index 81bc21f6e..c9cc0e3fe 100644
--- a/go/weed/export.go
+++ b/go/weed/export.go
@@ -3,8 +3,6 @@ package main
import (
"archive/tar"
"bytes"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"fmt"
"os"
"path"
@@ -12,6 +10,9 @@ import (
"strings"
"text/template"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
func init() {
@@ -36,7 +37,7 @@ var cmdExport = &Command{
var (
exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
exportCollection = cmdExport.Flag.String("collection", "", "the volume collection name")
- exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+ exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
tarFh *tar.Writer
diff --git a/go/weed/filer.go b/go/weed/filer.go
index 7dbecb4d0..5b3fd2b67 100644
--- a/go/weed/filer.go
+++ b/go/weed/filer.go
@@ -1,13 +1,14 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/util"
- "github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http"
"os"
"strconv"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
)
var (
@@ -20,6 +21,11 @@ type FilerOptions struct {
collection *string
defaultReplicaPlacement *string
dir *string
+ redirectOnRead *bool
+ cassandra_server *string
+ cassandra_keyspace *string
+ redis_server *string
+ redis_database *int
}
func init() {
@@ -28,14 +34,19 @@ func init() {
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
- f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.")
+ f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
+ f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
+ f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
+ f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
+ f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
+ f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
}
var cmdFiler = &Command{
UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>",
Short: "start a file server that points to a master server",
Long: `start a file server which accepts REST operation for any files.
-
+
//create or overwrite the file, the directories /path/to will be automatically created
POST /path/to/file
//get the file content
@@ -44,22 +55,27 @@ var cmdFiler = &Command{
POST /path/to/
//return a json format subdirectory and files listing
GET /path/to/
-
+
Current <fullpath~fileid> mapping metadata store is local embedded leveldb.
It should be highly scalable to hundreds of millions of files on a modest machine.
-
+
Future we will ensure it can avoid of being SPOF.
`,
}
func runFiler(cmd *Command, args []string) bool {
+
if err := util.TestFolderWritable(*f.dir); err != nil {
glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err)
}
r := http.NewServeMux()
- _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection)
+ _, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
+ *f.defaultReplicaPlacement, *f.redirectOnRead,
+ *f.cassandra_server, *f.cassandra_keyspace,
+ *f.redis_server, *f.redis_database,
+ )
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
diff --git a/go/weed/fix.go b/go/weed/fix.go
index ad573875a..e66075ed2 100644
--- a/go/weed/fix.go
+++ b/go/weed/fix.go
@@ -1,11 +1,12 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"os"
"path"
"strconv"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
func init() {
@@ -16,7 +17,7 @@ func init() {
var cmdFix = &Command{
UsageLine: "fix -dir=/tmp -volumeId=234",
Short: "run weed tool fix on index file if corrupted",
- Long: `Fix runs the WeedFS fix command to re-create the index .idx file.
+ Long: `Fix runs the SeeweedFS fix command to re-create the index .idx file.
`,
}
diff --git a/go/weed/master.go b/go/weed/master.go
index 6617c8ca6..de4b5cb4b 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -1,16 +1,17 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/util"
- "github.com/chrislusf/weed-fs/go/weed/weed_server"
- "github.com/gorilla/mux"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
+ "github.com/gorilla/mux"
)
func init() {
@@ -29,6 +30,7 @@ var cmdMaster = &Command{
var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces")
+ masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
mPublicIp = cmdMaster.Flag.String("publicIp", "", "peer accessible <ip>|<server_name>")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
@@ -40,6 +42,7 @@ var (
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission")
masterWhiteList []string
)
@@ -58,10 +61,11 @@ func runMaster(cmd *Command, args []string) bool {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
- *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
+ *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold,
+ masterWhiteList, *masterSecureKey,
)
- listeningAddress := *masterIp + ":" + strconv.Itoa(*mport)
+ listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)
diff --git a/go/weed/mount.go b/go/weed/mount.go
index 0290e0f53..66e645387 100644
--- a/go/weed/mount.go
+++ b/go/weed/mount.go
@@ -1,7 +1,5 @@
package main
-import ()
-
type MountOptions struct {
filer *string
dir *string
diff --git a/go/weed/mount_std.go b/go/weed/mount_std.go
index e5fc0986c..808c6c563 100644
--- a/go/weed/mount_std.go
+++ b/go/weed/mount_std.go
@@ -3,15 +3,16 @@
package main
import (
+ "fmt"
+ "os"
+ "runtime"
+
"bazil.org/fuse"
"bazil.org/fuse/fs"
"github.com/chrislusf/weed-fs/go/filer"
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
- "fmt"
- "os"
- "runtime"
)
func runMount(cmd *Command, args []string) bool {
diff --git a/go/weed/server.go b/go/weed/server.go
index 1d854d641..0b973f7e1 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -1,10 +1,6 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/util"
- "github.com/chrislusf/weed-fs/go/weed/weed_server"
- "github.com/gorilla/mux"
"net/http"
"os"
"runtime"
@@ -13,6 +9,11 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
+ "github.com/gorilla/mux"
)
type ServerOptions struct {
@@ -31,17 +32,17 @@ func init() {
var cmdServer = &Command{
UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name",
Short: "start a server, including volume server, and automatically elect a master server",
- Long: `start both a volume server to provide storage spaces
+ Long: `start both a volume server to provide storage spaces
and a master server to provide volume=>location mapping service and sequence number of file ids
-
+
This is provided as a convenient way to start both volume server and master server.
The servers are exactly the same as starting them separately.
So other volume servers can use this embedded master server also.
-
+
Optionally, one filer server can be started. Logically, filer servers should not be in a cluster.
They run with meta data on disk, not shared. So each filer server is different.
-
+
`,
}
@@ -55,6 +56,7 @@ var (
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list")
+ serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access")
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@@ -72,12 +74,18 @@ var (
)
func init() {
- serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "write cpu profile to file")
+ serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
+ filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
+ filerOptions.cassandra_server = cmdFiler.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
+ filerOptions.cassandra_keyspace = cmdFiler.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
+ filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
+ filerOptions.redis_database = cmdFiler.Flag.Int("filer.redis.database", 0, "the database on the redis server")
+
}
func runServer(cmd *Command, args []string) bool {
@@ -98,6 +106,10 @@ func runServer(cmd *Command, args []string) bool {
}
}
+ if *filerOptions.redirectOnRead {
+ *isStartingFiler = true
+ }
+
*filerOptions.master = *serverPublicIp + ":" + strconv.Itoa(*masterPort)
if *filerOptions.defaultReplicaPlacement == "" {
@@ -149,7 +161,11 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingFiler {
go func() {
r := http.NewServeMux()
- _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection)
+ _, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
+ *filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
+ "", "",
+ "", 0,
+ )
if nfs_err != nil {
glog.Fatalf(nfs_err.Error())
}
@@ -176,7 +192,8 @@ func runServer(cmd *Command, args []string) bool {
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
- *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, serverWhiteList,
+ *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
+ serverWhiteList, *serverSecureKey,
)
glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort))
@@ -208,8 +225,8 @@ func runServer(cmd *Command, args []string) bool {
time.Sleep(100 * time.Millisecond)
r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, *serverIp, *volumePort, *serverPublicIp, folders, maxCounts,
- *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, serverWhiteList,
- *volumeFixJpgOrientation,
+ *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
+ serverWhiteList, *volumeFixJpgOrientation,
)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
diff --git a/go/weed/shell.go b/go/weed/shell.go
index c8043e0dd..f2c4990ea 100644
--- a/go/weed/shell.go
+++ b/go/weed/shell.go
@@ -2,9 +2,10 @@ package main
import (
"bufio"
- "github.com/chrislusf/weed-fs/go/glog"
"fmt"
"os"
+
+ "github.com/chrislusf/weed-fs/go/glog"
)
func init() {
diff --git a/go/weed/signal_handling_notsupported.go b/go/weed/signal_handling_notsupported.go
index ad4f37b0c..343cf7de2 100644
--- a/go/weed/signal_handling_notsupported.go
+++ b/go/weed/signal_handling_notsupported.go
@@ -2,7 +2,5 @@
package main
-import ()
-
func OnInterrupt(fn func()) {
}
diff --git a/go/weed/upload.go b/go/weed/upload.go
index 4eae4d274..2d67c0bd9 100644
--- a/go/weed/upload.go
+++ b/go/weed/upload.go
@@ -1,11 +1,12 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/operation"
"encoding/json"
"fmt"
"os"
"path/filepath"
+
+ "github.com/chrislusf/weed-fs/go/operation"
)
var (
diff --git a/go/weed/version.go b/go/weed/version.go
index 63441509e..8d3a6fed7 100644
--- a/go/weed/version.go
+++ b/go/weed/version.go
@@ -1,9 +1,10 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/util"
"fmt"
"runtime"
+
+ "github.com/chrislusf/weed-fs/go/util"
)
var cmdVersion = &Command{
diff --git a/go/weed/volume.go b/go/weed/volume.go
index 17d03f0c5..1683e1927 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -1,15 +1,16 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/util"
- "github.com/chrislusf/weed-fs/go/weed/weed_server"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/chrislusf/weed-fs/go/weed/weed_server"
)
func init() {
@@ -26,11 +27,12 @@ var cmdVolume = &Command{
var (
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
+ volumeSecurePort = cmdVolume.Flag.Int("port.secure", 8443, "https listen port, active when SSL certs are specified. Not ready yet.")
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
ip = cmdVolume.Flag.String("ip", "", "ip or server name")
publicIp = cmdVolume.Flag.String("publicIp", "", "Publicly accessible <ip|server_name>")
- bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ volumeBindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
vTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds")
@@ -69,6 +71,7 @@ func runVolume(cmd *Command, args []string) bool {
if *publicIp == "" {
if *ip == "" {
+ *ip = "127.0.0.1"
*publicIp = "localhost"
} else {
*publicIp = *ip
@@ -81,11 +84,12 @@ func runVolume(cmd *Command, args []string) bool {
r := http.NewServeMux()
volumeServer := weed_server.NewVolumeServer(r, *ip, *vport, *publicIp, folders, maxCounts,
- *masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList,
+ *masterNode, *vpulse, *dataCenter, *rack,
+ volumeWhiteList,
*fixJpgOrientation,
)
- listeningAddress := *bindIp + ":" + strconv.Itoa(*vport)
+ listeningAddress := *volumeBindIp + ":" + strconv.Itoa(*vport)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress)
diff --git a/go/weed/volume_test.go b/go/weed/volume_test.go
index 764362a2b..ef00a8c7c 100644
--- a/go/weed/volume_test.go
+++ b/go/weed/volume_test.go
@@ -1,10 +1,11 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
"net/http"
"testing"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
)
func TestXYZ(t *testing.T) {
diff --git a/go/weed/weed.go b/go/weed/weed.go
index c1f5a72de..c304b7f35 100644
--- a/go/weed/weed.go
+++ b/go/weed/weed.go
@@ -1,7 +1,6 @@
package main
import (
- "github.com/chrislusf/weed-fs/go/glog"
"flag"
"fmt"
"io"
@@ -13,6 +12,8 @@ import (
"time"
"unicode"
"unicode/utf8"
+
+ "github.com/chrislusf/weed-fs/go/glog"
)
var IsDebug *bool
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go
index 816107dc5..44ffcce47 100644
--- a/go/weed/weed_server/common.go
+++ b/go/weed/weed_server/common.go
@@ -2,18 +2,19 @@ package weed_server
import (
"bytes"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/stats"
- "github.com/chrislusf/weed-fs/go/storage"
- "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
+ "errors"
"fmt"
- "net"
"net/http"
"path/filepath"
"strconv"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
)
var serverStats *stats.ServerStats
@@ -24,7 +25,7 @@ func init() {
}
-func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) {
+func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var bytes []byte
if r.FormValue("pretty") != "" {
bytes, err = json.MarshalIndent(obj, "", " ")
@@ -37,9 +38,11 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err err
callback := r.FormValue("callback")
if callback == "" {
w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(httpStatus)
_, err = w.Write(bytes)
} else {
w.Header().Set("Content-Type", "application/javascript")
+ w.WriteHeader(httpStatus)
if _, err = w.Write([]uint8(callback)); err != nil {
return
}
@@ -51,64 +54,45 @@ func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err err
return
}
}
+
return
}
// wrapper for writeJson - just logs errors
-func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) {
- if err := writeJson(w, r, obj); err != nil {
+func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
+ if err := writeJson(w, r, httpStatus, obj); err != nil {
glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error())
}
}
-func writeJsonError(w http.ResponseWriter, r *http.Request, err error) {
- w.WriteHeader(http.StatusInternalServerError)
+func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
m := make(map[string]interface{})
m["error"] = err.Error()
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, httpStatus, m)
}
func debug(params ...interface{}) {
glog.V(4).Infoln(params)
}
-func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
- return func(w http.ResponseWriter, r *http.Request) {
- if len(whiteList) == 0 {
- f(w, r)
- return
- }
- host, _, err := net.SplitHostPort(r.RemoteAddr)
- if err == nil {
- for _, ip := range whiteList {
- if ip == host {
- f(w, r)
- return
- }
- }
- }
- writeJsonQuiet(w, r, map[string]interface{}{"error": "No write permisson from " + host})
- }
-}
-
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
m := make(map[string]interface{})
if r.Method != "POST" {
- m["error"] = "Only submit via POST!"
- writeJsonQuiet(w, r, m)
+ writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
return
}
debug("parsing upload file...")
fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
if pe != nil {
- writeJsonError(w, r, pe)
+ writeJsonError(w, r, http.StatusBadRequest, pe)
return
}
debug("assigning file id for", fname)
+ r.ParseForm()
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl"))
if ae != nil {
- writeJsonError(w, r, ae)
+ writeJsonError(w, r, http.StatusInternalServerError, ae)
return
}
@@ -120,7 +104,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
debug("upload file to store", url)
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType)
if err != nil {
- writeJsonError(w, r, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
@@ -128,7 +112,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
m["fid"] = assignResult.Fid
m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
m["size"] = uploadResult.Size
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusCreated, m)
return
}
@@ -137,10 +121,10 @@ func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
fids := r.Form["fid"]
ret, err := operation.DeleteFiles(masterUrl, fids)
if err != nil {
- writeJsonError(w, r, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- writeJsonQuiet(w, r, ret)
+ writeJsonQuiet(w, r, http.StatusAccepted, ret)
}
func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) {
@@ -180,12 +164,12 @@ func statsCounterHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
m["Counters"] = serverStats
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
func statsMemoryHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
m["Memory"] = stats.MemStat()
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go
index 5ff0ed986..18a02b5e0 100644
--- a/go/weed/weed_server/filer_server.go
+++ b/go/weed/weed_server/filer_server.go
@@ -1,32 +1,57 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/filer"
- "github.com/chrislusf/weed-fs/go/glog"
"net/http"
"strconv"
+
+ "github.com/chrislusf/weed-fs/go/filer"
+ "github.com/chrislusf/weed-fs/go/filer/cassandra_store"
+ "github.com/chrislusf/weed-fs/go/filer/embedded_filer"
+ "github.com/chrislusf/weed-fs/go/filer/flat_namespace"
+ "github.com/chrislusf/weed-fs/go/filer/redis_store"
+ "github.com/chrislusf/weed-fs/go/glog"
)
type FilerServer struct {
- port string
- master string
- collection string
- filer filer.Filer
+ port string
+ master string
+ collection string
+ defaultReplication string
+ redirectOnRead bool
+ filer filer.Filer
}
-func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string) (fs *FilerServer, err error) {
+func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
+ replication string, redirectOnRead bool,
+ cassandra_server string, cassandra_keyspace string,
+ redis_server string, redis_database int,
+) (fs *FilerServer, err error) {
fs = &FilerServer{
- master: master,
- collection: collection,
- port: ":" + strconv.Itoa(port),
+ master: master,
+ collection: collection,
+ defaultReplication: replication,
+ redirectOnRead: redirectOnRead,
+ port: ":" + strconv.Itoa(port),
}
- if fs.filer, err = filer.NewFilerEmbedded(master, dir); err != nil {
- glog.Fatal("Can not start filer in dir", dir, ": ", err.Error())
- return
+ if cassandra_server != "" {
+ cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
+ if err != nil {
+ glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
+ }
+ fs.filer = flat_namespace.NewFlatNamesapceFiler(master, cassandra_store)
+ } else if redis_server != "" {
+ redis_store := redis_store.NewRedisStore(redis_server, redis_database)
+ fs.filer = flat_namespace.NewFlatNamesapceFiler(master, redis_store)
+ } else {
+ if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
+ glog.Fatalf("Can not start filer in dir %s : %v", err)
+ return
+ }
+
+ r.HandleFunc("/admin/mv", fs.moveHandler)
}
- r.HandleFunc("/admin/mv", fs.moveHandler)
r.HandleFunc("/", fs.filerHandler)
return fs, nil
diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go
index e36e7c310..0811b7973 100644
--- a/go/weed/weed_server/filer_server_handlers.go
+++ b/go/weed/weed_server/filer_server_handlers.go
@@ -1,12 +1,8 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
- "github.com/syndtr/goleveldb/leveldb"
"io"
"io/ioutil"
"math/rand"
@@ -14,6 +10,11 @@ import (
"net/url"
"strconv"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/syndtr/goleveldb/leveldb"
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
@@ -49,7 +50,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
limit = 100
}
m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit)
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
if strings.HasSuffix(r.URL.Path, "/") {
@@ -80,7 +81,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
urlLocation := lookup.Locations[rand.Intn(len(lookup.Locations))].PublicUrl
- u, _ := url.Parse("http://" + urlLocation + "/" + fileId)
+ urlString := "http://" + urlLocation + "/" + fileId
+ if fs.redirectOnRead {
+ http.Redirect(w, r, urlString, http.StatusFound)
+ return
+ }
+ u, _ := url.Parse(urlString)
request := &http.Request{
Method: r.Method,
URL: u,
@@ -96,7 +102,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
resp, do_err := util.Do(request)
if do_err != nil {
glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
- writeJsonError(w, r, do_err)
+ writeJsonError(w, r, http.StatusInternalServerError, do_err)
return
}
defer resp.Body.Close()
@@ -109,10 +115,14 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
- assignResult, ae := operation.Assign(fs.master, 1, query.Get("replication"), fs.collection, query.Get("ttl"))
+ replication := query.Get("replication")
+ if replication == "" {
+ replication = fs.defaultReplication
+ }
+ assignResult, ae := operation.Assign(fs.master, 1, replication, fs.collection, query.Get("ttl"))
if ae != nil {
glog.V(0).Infoln("failing to assign a file id", ae.Error())
- writeJsonError(w, r, ae)
+ writeJsonError(w, r, http.StatusInternalServerError, ae)
return
}
@@ -132,14 +142,14 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
resp, do_err := util.Do(request)
if do_err != nil {
glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
- writeJsonError(w, r, do_err)
+ writeJsonError(w, r, http.StatusInternalServerError, do_err)
return
}
defer resp.Body.Close()
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
- writeJsonError(w, r, ra_err)
+ writeJsonError(w, r, http.StatusInternalServerError, ra_err)
return
}
glog.V(4).Infoln("post result", string(resp_body))
@@ -147,12 +157,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
- writeJsonError(w, r, unmarshal_err)
+ writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
return
}
if ret.Error != "" {
glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, errors.New(ret.Error))
+ writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
return
}
path := r.URL.Path
@@ -162,18 +172,20 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
} else {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, errors.New("Can not to write to folder "+path+" without a file name"))
+ writeJsonError(w, r, http.StatusInternalServerError,
+ errors.New("Can not to write to folder "+path+" without a file name"))
return
}
}
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
- glog.V(0).Infoln("failing to write to filer server", r.RequestURI, db_err.Error())
- writeJsonError(w, r, db_err)
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
+ writeJsonError(w, r, http.StatusInternalServerError, db_err)
return
}
w.WriteHeader(http.StatusCreated)
+ w.Write(resp_body)
}
// curl -X DELETE http://localhost:8888/path/to
@@ -191,10 +203,9 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
}
if err == nil {
- w.WriteHeader(http.StatusAccepted)
- writeJsonQuiet(w, r, map[string]string{"error": ""})
+ writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
- writeJsonError(w, r, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
}
}
diff --git a/go/weed/weed_server/filer_server_handlers_admin.go b/go/weed/weed_server/filer_server_handlers_admin.go
index ff52dff24..3f67b30f1 100644
--- a/go/weed/weed_server/filer_server_handlers_admin.go
+++ b/go/weed/weed_server/filer_server_handlers_admin.go
@@ -1,8 +1,9 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/glog"
"net/http"
+
+ "github.com/chrislusf/weed-fs/go/glog"
)
/*
@@ -21,7 +22,7 @@ func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) {
err := fs.filer.Move(from, to)
if err != nil {
glog.V(4).Infoln("moving", from, "->", to, err.Error())
- writeJsonError(w, r, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
w.WriteHeader(http.StatusOK)
}
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 401f6cfdb..056b1fe7b 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -1,16 +1,19 @@
package weed_server
import (
+ "fmt"
+ "net/http"
+ "net/http/httputil"
+ "net/url"
+ "sync"
+
"github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/topology"
"github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft"
"github.com/gorilla/mux"
- "net/http"
- "net/http/httputil"
- "net/url"
- "sync"
)
type MasterServer struct {
@@ -20,7 +23,6 @@ type MasterServer struct {
pulseSeconds int
defaultReplicaPlacement string
garbageThreshold string
- whiteList []string
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -36,6 +38,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
defaultReplicaPlacement string,
garbageThreshold string,
whiteList []string,
+ secureKey string,
) *MasterServer {
ms := &MasterServer{
port: port,
@@ -43,7 +46,6 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
pulseSeconds: pulseSeconds,
defaultReplicaPlacement: defaultReplicaPlacement,
garbageThreshold: garbageThreshold,
- whiteList: whiteList,
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
@@ -55,20 +57,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
- r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.dirLookupHandler)))
- r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler)))
- r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler)))
- r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler)))
- r.HandleFunc("/vol/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.volumeLookupHandler)))
- r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler)))
- r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler)))
- r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler)))
- r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler))
- r.HandleFunc("/delete", secure(ms.whiteList, ms.deleteFromMasterServerHandler))
+ guard := security.NewGuard(whiteList, secureKey)
+
+ r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler)))
+ r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler)))
+ r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler)))
+ r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler)))
+ r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler)))
+ r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler)))
+ r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler)))
+ r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler))
+ r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.redirectHandler)
- r.HandleFunc("/stats/counter", secure(ms.whiteList, statsCounterHandler))
- r.HandleFunc("/stats/memory", secure(ms.whiteList, statsMemoryHandler))
+ r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler))
+ r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler))
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
@@ -100,7 +104,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil {
- writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL http://" + ms.Topo.RaftServer.Leader() + " Parse Error " + err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError,
+ fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
return
}
glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index 93e9e7d9a..7b6ce58f3 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -1,12 +1,14 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/stats"
- "github.com/chrislusf/weed-fs/go/storage"
+ "fmt"
"net/http"
"strconv"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/storage"
)
func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) {
@@ -49,10 +51,11 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
collection := r.FormValue("collection") //optional, but can be faster if too many collections
volumeLocations := ms.lookupVolumeId(vids, collection)
location := volumeLocations[vid]
+ httpStatus := http.StatusOK
if location.Error != "" {
- w.WriteHeader(http.StatusNotFound)
+ httpStatus = http.StatusNotFound
}
- writeJsonQuiet(w, r, location)
+ writeJsonQuiet(w, r, httpStatus, location)
}
// This can take batched volumeIds, &volumeId=x&volumeId=y&volumeId=z
@@ -61,7 +64,7 @@ func (ms *MasterServer) volumeLookupHandler(w http.ResponseWriter, r *http.Reque
vids := r.Form["volumeId"]
collection := r.FormValue("collection") //optional, but can be faster if too many collections
volumeLocations := ms.lookupVolumeId(vids, collection)
- writeJsonQuiet(w, r, volumeLocations)
+ writeJsonQuiet(w, r, http.StatusOK, volumeLocations)
}
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
@@ -73,22 +76,21 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
option, err := ms.getVolumeGrowOption(r)
if err != nil {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJsonQuiet(w, r, operation.AssignResult{Error: err.Error()})
+ writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
return
}
- if !ms.Topo.HasWriableVolume(option) {
+ if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.FreeSpace() <= 0 {
- w.WriteHeader(http.StatusNotFound)
- writeJsonQuiet(w, r, operation.AssignResult{Error: "No free volumes left!"})
+ writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
return
} else {
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
- if !ms.Topo.HasWriableVolume(option) {
+ if !ms.Topo.HasWritableVolume(option) {
if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
- writeJsonQuiet(w, r, operation.AssignResult{Error: "Cannot grow volume group! " + err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError,
+ fmt.Errorf("Cannot grow volume group! %v", err))
return
}
}
@@ -96,9 +98,8 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
- writeJsonQuiet(w, r, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
+ writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJsonQuiet(w, r, operation.AssignResult{Error: err.Error()})
+ writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
}
}
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index c9a8020c2..437044eee 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -1,30 +1,32 @@
package weed_server
import (
- proto "code.google.com/p/goprotobuf/proto"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/storage"
- "github.com/chrislusf/weed-fs/go/topology"
- "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
+ "fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/topology"
+ "github.com/chrislusf/weed-fs/go/util"
+ "github.com/golang/protobuf/proto"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
if !ok {
- writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"})
+ writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist!", r.FormValue("collection")))
return
}
for _, server := range collection.ListVolumeServers() {
_, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection"))
if err != nil {
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
}
@@ -34,12 +36,12 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
- writeJsonError(w, r, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
return
}
joinMessage := &operation.JoinMessage{}
if err = proto.Unmarshal(body, joinMessage); err != nil {
- writeJsonError(w, r, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
return
}
if *joinMessage.Ip == "" {
@@ -48,7 +50,7 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
if glog.V(4) {
if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil {
glog.V(0).Infoln("json marshaling error: ", jsonError)
- writeJsonError(w, r, jsonError)
+ writeJsonError(w, r, http.StatusBadRequest, jsonError)
return
} else {
glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData))
@@ -56,14 +58,14 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
ms.Topo.ProcessJoinMessage(joinMessage)
- writeJsonQuiet(w, r, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
+ writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
m["Topology"] = ms.Topo.ToMap()
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
@@ -80,8 +82,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
count := 0
option, err := ms.getVolumeGrowOption(r)
if err != nil {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusNotAcceptable, err)
return
}
if err == nil {
@@ -96,11 +97,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
}
if err != nil {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusNotAcceptable, err)
} else {
- w.WriteHeader(http.StatusOK)
- writeJsonQuiet(w, r, map[string]interface{}{"count": count})
+ writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count})
}
}
@@ -108,7 +107,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
m := make(map[string]interface{})
m["Version"] = util.VERSION
m["Volumes"] = ms.Topo.ToVolumeMap()
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
@@ -122,8 +121,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
if machines != nil && len(machines) > 0 {
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
} else {
- w.WriteHeader(http.StatusNotFound)
- writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found.", volumeId))
}
}
@@ -143,7 +141,7 @@ func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *
}
}
-func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
+func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go
index e41867076..b9aaef2b0 100644
--- a/go/weed/weed_server/raft_server.go
+++ b/go/weed/weed_server/raft_server.go
@@ -2,19 +2,22 @@ package weed_server
import (
"bytes"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/topology"
"encoding/json"
"errors"
"fmt"
- "github.com/goraft/raft"
- "github.com/gorilla/mux"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
+ "os"
+ "path"
"strings"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/topology"
+ "github.com/goraft/raft"
+ "github.com/gorilla/mux"
)
type RaftServer struct {
@@ -44,6 +47,14 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
var err error
transporter := raft.NewHTTPTransporter("/cluster", 0)
transporter.Transport.MaxIdleConnsPerHost = 1024
+ glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr)
+
+ // Clear old cluster configurations if peers are set
+ if len(s.peers) > 0 {
+ os.RemoveAll(path.Join(s.dataDir, "conf"))
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ }
s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
if err != nil {
@@ -52,35 +63,30 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
}
transporter.Install(s.raftServer, s)
s.raftServer.SetHeartbeatInterval(1 * time.Second)
- s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 1150 * time.Millisecond)
+ s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 3450 * time.Millisecond)
s.raftServer.Start()
s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST")
s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET")
- // Join to leader if specified.
if len(s.peers) > 0 {
- if !s.raftServer.IsLogEmpty() {
- glog.V(0).Infoln("Starting cluster with existing logs.")
- } else {
- glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
- time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
- firstJoinError := s.Join(s.peers)
- if firstJoinError != nil {
- glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
- _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
- Name: s.raftServer.Name(),
- ConnectionString: "http://" + s.httpAddr,
- })
- if err != nil {
- glog.V(0).Infoln(err)
- return nil
- }
+ // Join to leader if specified.
+ glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ","))
+ time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
+ firstJoinError := s.Join(s.peers)
+ if firstJoinError != nil {
+ glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.")
+ _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: "http://" + s.httpAddr,
+ })
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return nil
}
}
-
- // Initialize the server by joining itself.
} else if s.raftServer.IsLogEmpty() {
+ // Initialize the server by joining itself.
glog.V(0).Infoln("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
@@ -94,7 +100,7 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
}
} else {
- glog.V(0).Infoln("Recovered from log")
+ glog.V(0).Infoln("Old conf,log,snapshot should have been removed.")
}
return s
diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go
index 4d51c0767..629de248b 100644
--- a/go/weed/weed_server/raft_server_handlers.go
+++ b/go/weed/weed_server/raft_server_handlers.go
@@ -1,13 +1,14 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/operation"
"encoding/json"
- "github.com/goraft/raft"
"io/ioutil"
"net/http"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/goraft/raft"
)
// Handles incoming RAFT joins.
@@ -59,5 +60,5 @@ func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
if leader, e := s.topo.Leader(); e == nil {
ret.Leader = leader
}
- writeJsonQuiet(w, r, ret)
+ writeJsonQuiet(w, r, http.StatusOK, ret)
}
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index 2a9085f3b..9ceeb0149 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -1,12 +1,14 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"math/rand"
"net/http"
"strconv"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/security"
+ "github.com/chrislusf/weed-fs/go/storage"
)
type VolumeServer struct {
@@ -14,8 +16,8 @@ type VolumeServer struct {
pulseSeconds int
dataCenter string
rack string
- whiteList []string
store *storage.Store
+ guard *security.Guard
FixJpgOrientation bool
}
@@ -23,29 +25,31 @@ type VolumeServer struct {
func NewVolumeServer(r *http.ServeMux, ip string, port int, publicIp string, folders []string, maxCounts []int,
masterNode string, pulseSeconds int,
dataCenter string, rack string,
- whiteList []string, fixJpgOrientation bool) *VolumeServer {
+ whiteList []string,
+ fixJpgOrientation bool) *VolumeServer {
publicUrl := publicIp + ":" + strconv.Itoa(port)
vs := &VolumeServer{
masterNode: masterNode,
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
- whiteList: whiteList,
FixJpgOrientation: fixJpgOrientation,
}
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts)
- r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler))
- r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler))
- r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler))
- r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler))
- r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler))
- r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler))
- r.HandleFunc("/admin/delete_collection", secure(vs.whiteList, vs.deleteCollectionHandler))
- r.HandleFunc("/stats/counter", secure(vs.whiteList, statsCounterHandler))
- r.HandleFunc("/stats/memory", secure(vs.whiteList, statsMemoryHandler))
- r.HandleFunc("/stats/disk", secure(vs.whiteList, vs.statsDiskHandler))
- r.HandleFunc("/delete", secure(vs.whiteList, vs.batchDeleteHandler))
+ vs.guard = security.NewGuard(whiteList, "")
+
+ r.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
+ r.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
+ r.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
+ r.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
+ r.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
+ r.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
+ r.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
+ r.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
+ r.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
+ r.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
+ r.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
r.HandleFunc("/", vs.storeHandler)
go func() {
diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go
index ce14f6a87..6b47ee84d 100644
--- a/go/weed/weed_server/volume_server_handlers.go
+++ b/go/weed/weed_server/volume_server_handlers.go
@@ -1,12 +1,7 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/images"
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/stats"
- "github.com/chrislusf/weed-fs/go/storage"
- "github.com/chrislusf/weed-fs/go/topology"
+ "errors"
"io"
"mime"
"mime/multipart"
@@ -14,6 +9,13 @@ import (
"strconv"
"strings"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/images"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/stats"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/topology"
)
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
@@ -28,13 +30,13 @@ func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) {
vs.GetOrHeadHandler(w, r)
case "DELETE":
stats.DeleteRequest()
- secure(vs.whiteList, vs.DeleteHandler)(w, r)
+ vs.guard.Secure(vs.DeleteHandler)(w, r)
case "PUT":
stats.WriteRequest()
- secure(vs.whiteList, vs.PostHandler)(w, r)
+ vs.guard.Secure(vs.PostHandler)(w, r)
case "POST":
stats.WriteRequest()
- secure(vs.whiteList, vs.PostHandler)(w, r)
+ vs.guard.Secure(vs.PostHandler)(w, r)
}
}
@@ -234,35 +236,34 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e)
- writeJsonError(w, r, e)
+ writeJsonError(w, r, http.StatusBadRequest, e)
return
}
vid, _, _, _, _ := parseURLPath(r.URL.Path)
volumeId, ve := storage.NewVolumeId(vid)
if ve != nil {
glog.V(0).Infoln("NewVolumeId error:", ve)
- writeJsonError(w, r, ve)
+ writeJsonError(w, r, http.StatusBadRequest, ve)
return
}
needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation)
if ne != nil {
- writeJsonError(w, r, ne)
+ writeJsonError(w, r, http.StatusBadRequest, ne)
return
}
ret := operation.UploadResult{}
size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
- if errorStatus == "" {
- w.WriteHeader(http.StatusCreated)
- } else {
- w.WriteHeader(http.StatusInternalServerError)
+ httpStatus := http.StatusCreated
+ if errorStatus != "" {
+ httpStatus = http.StatusInternalServerError
ret.Error = errorStatus
}
if needle.HasName() {
ret.Name = string(needle.Name)
}
ret.Size = size
- writeJsonQuiet(w, r, ret)
+ writeJsonQuiet(w, r, httpStatus, ret)
}
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
@@ -279,7 +280,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusNotFound, m)
return
}
@@ -292,14 +293,13 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
if ret != 0 {
- w.WriteHeader(http.StatusAccepted)
+ m := make(map[string]uint32)
+ m["size"] = uint32(count)
+ writeJsonQuiet(w, r, http.StatusAccepted, m)
} else {
- w.WriteHeader(http.StatusInternalServerError)
+ writeJsonError(w, r, http.StatusInternalServerError, errors.New("Deletion Failed."))
}
- m := make(map[string]uint32)
- m["size"] = uint32(count)
- writeJsonQuiet(w, r, m)
}
//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
@@ -333,7 +333,5 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
}
}
- w.WriteHeader(http.StatusAccepted)
-
- writeJsonQuiet(w, r, ret)
+ writeJsonQuiet(w, r, http.StatusAccepted, ret)
}
diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go
index 1118c8017..1581a5770 100644
--- a/go/weed/weed_server/volume_server_handlers_admin.go
+++ b/go/weed/weed_server/volume_server_handlers_admin.go
@@ -1,26 +1,27 @@
package weed_server
import (
+ "net/http"
+ "path/filepath"
+
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/util"
- "net/http"
- "path/filepath"
)
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
m["Volumes"] = vs.store.Status()
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replication"), r.FormValue("ttl"))
if err == nil {
- writeJsonQuiet(w, r, map[string]string{"error": ""})
+ writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
} else {
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusNotAcceptable, err)
}
glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err)
}
@@ -32,9 +33,9 @@ func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.R
}
err := vs.store.DeleteCollection(r.FormValue("collection"))
if err == nil {
- writeJsonQuiet(w, r, map[string]string{"error": ""})
+ writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err)
}
@@ -43,9 +44,9 @@ func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Reque
//TODO: notify master that this volume will be read-only
err := vs.store.FreezeVolume(r.FormValue("volume"))
if err == nil {
- writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
+ writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("freeze volume =", r.FormValue("volume"), ", error =", err)
}
@@ -60,5 +61,5 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
}
}
m["DiskStatues"] = ds
- writeJsonQuiet(w, r, m)
+ writeJsonQuiet(w, r, http.StatusOK, m)
}
diff --git a/go/weed/weed_server/volume_server_handlers_vacuum.go b/go/weed/weed_server/volume_server_handlers_vacuum.go
index b0600d799..cb30e10b4 100644
--- a/go/weed/weed_server/volume_server_handlers_vacuum.go
+++ b/go/weed/weed_server/volume_server_handlers_vacuum.go
@@ -1,34 +1,35 @@
package weed_server
import (
- "github.com/chrislusf/weed-fs/go/glog"
"net/http"
+
+ "github.com/chrislusf/weed-fs/go/glog"
)
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
if err == nil {
- writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret})
+ writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"error": "", "result": ret})
} else {
- writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false})
+ writeJsonQuiet(w, r, http.StatusInternalServerError, map[string]interface{}{"error": err.Error(), "result": false})
}
glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
}
func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.CompactVolume(r.FormValue("volume"))
if err == nil {
- writeJsonQuiet(w, r, map[string]string{"error": ""})
+ writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("compacted volume =", r.FormValue("volume"), ", error =", err)
}
func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
err := vs.store.CommitCompactVolume(r.FormValue("volume"))
if err == nil {
- writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
+ writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""})
} else {
- writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ writeJsonError(w, r, http.StatusInternalServerError, err)
}
glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err)
}