diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/backup.go | 90 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 532 | ||||
| -rw-r--r-- | weed/command/command.go | 71 | ||||
| -rw-r--r-- | weed/command/compact.go | 45 | ||||
| -rw-r--r-- | weed/command/download.go | 130 | ||||
| -rw-r--r-- | weed/command/export.go | 213 | ||||
| -rw-r--r-- | weed/command/filer.go | 105 | ||||
| -rw-r--r-- | weed/command/fix.go | 70 | ||||
| -rw-r--r-- | weed/command/master.go | 91 | ||||
| -rw-r--r-- | weed/command/mount.go | 35 | ||||
| -rw-r--r-- | weed/command/mount_notsupported.go | 15 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 106 | ||||
| -rw-r--r-- | weed/command/server.go | 291 | ||||
| -rw-r--r-- | weed/command/shell.go | 61 | ||||
| -rw-r--r-- | weed/command/signal_handling.go | 31 | ||||
| -rw-r--r-- | weed/command/signal_handling_notsupported.go | 6 | ||||
| -rw-r--r-- | weed/command/upload.go | 108 | ||||
| -rw-r--r-- | weed/command/version.go | 24 | ||||
| -rw-r--r-- | weed/command/volume.go | 165 | ||||
| -rw-r--r-- | weed/command/volume_test.go | 13 |
20 files changed, 2202 insertions, 0 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go new file mode 100644 index 000000000..0b3994027 --- /dev/null +++ b/weed/command/backup.go @@ -0,0 +1,90 @@ +package command + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +var ( + s BackupOptions +) + +type BackupOptions struct { + master *string + collection *string + dir *string + volumeId *int +} + +func init() { + cmdBackup.Run = runBackup // break init cycle + s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location") + s.collection = cmdBackup.Flag.String("collection", "", "collection name") + s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files") + s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.") +} + +var cmdBackup = &Command{ + UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333", + Short: "incrementally backup a volume to local folder", + Long: `Incrementally backup volume data. + + It is expected that you use this inside a script, to loop through + all possible volume ids that needs to be backup to local folder. + + The volume id does not need to exist locally or even remotely. + This will help to backup future new volumes. + + Usually backing up is just copying the .dat (and .idx) files. + But it's tricky to incremententally copy the differences. + + The complexity comes when there are multiple addition, deletion and compaction. + This tool will handle them correctly and efficiently, avoiding unnecessary data transporation. + `, +} + +func runBackup(cmd *Command, args []string) bool { + if *s.volumeId == -1 { + return false + } + vid := storage.VolumeId(*s.volumeId) + + // find volume location, replication, ttl info + lookup, err := operation.Lookup(*s.master, vid.String()) + if err != nil { + fmt.Printf("Error looking up volume %d: %v\n", vid, err) + return true + } + volumeServer := lookup.Locations[0].Url + + stats, err := operation.GetVolumeSyncStatus(volumeServer, vid.String()) + if err != nil { + fmt.Printf("Error get volume %d status: %v\n", vid, err) + return true + } + ttl, err := storage.ReadTTL(stats.Ttl) + if err != nil { + fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err) + return true + } + replication, err := storage.NewReplicaPlacementFromString(stats.Replication) + if err != nil { + fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err) + return true + } + + v, err := storage.NewVolume(*s.dir, *s.collection, vid, storage.NeedleMapInMemory, replication, ttl) + if err != nil { + fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err) + return true + } + + if err := v.Synchronize(volumeServer); err != nil { + fmt.Printf("Error synchronizing volume %d: %v\n", vid, err) + return true + } + + return true +} diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go new file mode 100644 index 000000000..7e0802e30 --- /dev/null +++ b/weed/command/benchmark.go @@ -0,0 +1,532 @@ +package command + +import ( + "bufio" + "fmt" + "io" + "math" + "math/rand" + "os" + "runtime" + "runtime/pprof" + "sort" + "strings" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type BenchmarkOptions struct { + server *string + concurrency *int + numberOfFiles *int + fileSize *int + idListFile *string + write *bool + deletePercentage *int + read *bool + sequentialRead *bool + collection *string + cpuprofile *string + maxCpu *int + secretKey *string +} + +var ( + b BenchmarkOptions + sharedBytes []byte +) + +func init() { + cmdBenchmark.Run = runbenchmark // break init cycle + cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information") + b.server = cmdBenchmark.Flag.String("server", "localhost:9333", "SeaweedFS master location") + b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes") + b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding") + b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread") + b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids") + b.write = cmdBenchmark.Flag.Bool("write", true, "enable write") + b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes") + b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") + b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") + b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") + b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") + b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + sharedBytes = make([]byte, 1024) +} + +var cmdBenchmark = &Command{ + UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000", + Short: "benchmark on writing millions of files and read out", + Long: `benchmark on an empty SeaweedFS file system. + + Two tests during benchmark: + 1) write lots of small files to the system + 2) read the files out + + The file content is mostly zero, but no compression is done. + + You can choose to only benchmark read or write. + During write, the list of uploaded file ids is stored in "-list" specified file. + You can also use your own list of file ids to run read test. + + Write speed and read speed will be collected. + The numbers are used to get a sense of the system. + Usually your network or the hard drive is the real bottleneck. + + Another thing to watch is whether the volumes are evenly distributed + to each volume server. Because the 7 more benchmark volumes are randomly distributed + to servers with free slots, it's highly possible some servers have uneven amount of + benchmark volumes. To remedy this, you can use this to grow the benchmark volumes + before starting the benchmark command: + http://localhost:9333/vol/grow?collection=benchmark&count=5 + + After benchmarking, you can clean up the written data by deleting the benchmark collection + http://localhost:9333/col/delete?collection=benchmark + + `, +} + +var ( + wait sync.WaitGroup + writeStats *stats + readStats *stats +) + +func runbenchmark(cmd *Command, args []string) bool { + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + if *b.maxCpu < 1 { + *b.maxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*b.maxCpu) + if *b.cpuprofile != "" { + f, err := os.Create(*b.cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + if *b.write { + bench_write() + } + + if *b.read { + bench_read() + } + + return true +} + +func bench_write() { + fileIdLineChan := make(chan string) + finishChan := make(chan bool) + writeStats = newStats(*b.concurrency) + idChan := make(chan int) + go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) + for i := 0; i < *b.concurrency; i++ { + wait.Add(1) + go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i]) + } + writeStats.start = time.Now() + writeStats.total = *b.numberOfFiles + go writeStats.checkProgress("Writing Benchmark", finishChan) + for i := 0; i < *b.numberOfFiles; i++ { + idChan <- i + } + close(idChan) + wait.Wait() + writeStats.end = time.Now() + wait.Add(2) + finishChan <- true + finishChan <- true + wait.Wait() + close(finishChan) + writeStats.printStats() +} + +func bench_read() { + fileIdLineChan := make(chan string) + finishChan := make(chan bool) + readStats = newStats(*b.concurrency) + go readFileIds(*b.idListFile, fileIdLineChan) + readStats.start = time.Now() + readStats.total = *b.numberOfFiles + go readStats.checkProgress("Randomly Reading Benchmark", finishChan) + for i := 0; i < *b.concurrency; i++ { + wait.Add(1) + go readFiles(fileIdLineChan, &readStats.localStats[i]) + } + wait.Wait() + wait.Add(1) + finishChan <- true + wait.Wait() + close(finishChan) + readStats.end = time.Now() + readStats.printStats() +} + +type delayedFile struct { + enterTime time.Time + fp *operation.FilePart +} + +func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { + defer wait.Done() + delayedDeleteChan := make(chan *delayedFile, 100) + var waitForDeletions sync.WaitGroup + secret := security.Secret(*b.secretKey) + + for i := 0; i < 7; i++ { + waitForDeletions.Add(1) + go func() { + defer waitForDeletions.Done() + for df := range delayedDeleteChan { + if df.enterTime.After(time.Now()) { + time.Sleep(df.enterTime.Sub(time.Now())) + } + if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid, + security.GenJwt(secret, df.fp.Fid)); e == nil { + s.completed++ + } else { + s.failed++ + } + } + }() + } + + for id := range idChan { + start := time.Now() + fileSize := int64(*b.fileSize + rand.Intn(64)) + fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { + fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection + if _, err := fp.Upload(0, *b.server, secret); err == nil { + if rand.Intn(100) < *b.deletePercentage { + s.total++ + delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} + } else { + fileIdLineChan <- fp.Fid + } + s.completed++ + s.transferred += fileSize + } else { + s.failed++ + fmt.Printf("Failed to write with error:%v\n", err) + } + writeStats.addSample(time.Now().Sub(start)) + if *cmdBenchmark.IsDebug { + fmt.Printf("writing %d file %s\n", id, fp.Fid) + } + } else { + s.failed++ + println("writing file error:", err.Error()) + } + } + close(delayedDeleteChan) + waitForDeletions.Wait() +} + +func readFiles(fileIdLineChan chan string, s *stat) { + defer wait.Done() + for fid := range fileIdLineChan { + if len(fid) == 0 { + continue + } + if fid[0] == '#' { + continue + } + if *cmdBenchmark.IsDebug { + fmt.Printf("reading file %s\n", fid) + } + parts := strings.SplitN(fid, ",", 2) + vid := parts[0] + start := time.Now() + ret, err := operation.Lookup(*b.server, vid) + if err != nil || len(ret.Locations) == 0 { + s.failed++ + println("!!!! volume id ", vid, " location not found!!!!!") + continue + } + server := ret.Locations[rand.Intn(len(ret.Locations))].Url + url := "http://" + server + "/" + fid + if bytesRead, err := util.Get(url); err == nil { + s.completed++ + s.transferred += int64(len(bytesRead)) + readStats.addSample(time.Now().Sub(start)) + } else { + s.failed++ + fmt.Printf("Failed to read %s error:%v\n", url, err) + } + } +} + +func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { + file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + glog.Fatalf("File to create file %s: %s\n", fileName, err) + } + defer file.Close() + + for { + select { + case <-finishChan: + wait.Done() + return + case line := <-fileIdLineChan: + file.Write([]byte(line)) + file.Write([]byte("\n")) + } + } +} + +func readFileIds(fileName string, fileIdLineChan chan string) { + file, err := os.Open(fileName) // For read access. + if err != nil { + glog.Fatalf("File to read file %s: %s\n", fileName, err) + } + defer file.Close() + + r := bufio.NewReader(file) + if *b.sequentialRead { + for { + if line, err := Readln(r); err == nil { + fileIdLineChan <- string(line) + } else { + break + } + } + } else { + lines := make([]string, 0, readStats.total) + for { + if line, err := Readln(r); err == nil { + lines = append(lines, string(line)) + } else { + break + } + } + if len(lines) > 0 { + for i := 0; i < readStats.total; i++ { + fileIdLineChan <- lines[rand.Intn(len(lines))] + } + } + } + + close(fileIdLineChan) +} + +const ( + benchResolution = 10000 //0.1 microsecond + benchBucket = 1000000000 / benchResolution +) + +// An efficient statics collecting and rendering +type stats struct { + data []int + overflow []int + localStats []stat + start time.Time + end time.Time + total int +} +type stat struct { + completed int + failed int + total int + transferred int64 +} + +var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100} + +func newStats(n int) *stats { + return &stats{ + data: make([]int, benchResolution), + overflow: make([]int, 0), + localStats: make([]stat, n), + } +} + +func (s *stats) addSample(d time.Duration) { + index := int(d / benchBucket) + if index < 0 { + fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000) + } else if index < len(s.data) { + s.data[int(d/benchBucket)]++ + } else { + s.overflow = append(s.overflow, index) + } +} + +func (s *stats) checkProgress(testName string, finishChan chan bool) { + fmt.Printf("\n------------ %s ----------\n", testName) + ticker := time.Tick(time.Second) + lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now() + for { + select { + case <-finishChan: + wait.Done() + return + case t := <-ticker: + completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total + for _, localStat := range s.localStats { + completed += localStat.completed + transferred += localStat.transferred + total += localStat.total + } + fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n", + completed, total, float64(completed)*100/float64(total), + float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)), + float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024), + ) + lastCompleted, lastTransferred, lastTime = completed, transferred, t + } + } +} + +func (s *stats) printStats() { + completed, failed, transferred, total := 0, 0, int64(0), s.total + for _, localStat := range s.localStats { + completed += localStat.completed + failed += localStat.failed + transferred += localStat.transferred + total += localStat.total + } + timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000 + fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency) + fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken) + fmt.Printf("Complete requests: %d\n", completed) + fmt.Printf("Failed requests: %d\n", failed) + fmt.Printf("Total transferred: %d bytes\n", transferred) + fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken) + fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken) + n, sum := 0, 0 + min, max := 10000000, 0 + for i := 0; i < len(s.data); i++ { + n += s.data[i] + sum += s.data[i] * i + if s.data[i] > 0 { + if min > i { + min = i + } + if max < i { + max = i + } + } + } + n += len(s.overflow) + for i := 0; i < len(s.overflow); i++ { + sum += s.overflow[i] + if min > s.overflow[i] { + min = s.overflow[i] + } + if max < s.overflow[i] { + max = s.overflow[i] + } + } + avg := float64(sum) / float64(n) + varianceSum := 0.0 + for i := 0; i < len(s.data); i++ { + if s.data[i] > 0 { + d := float64(i) - avg + varianceSum += d * d * float64(s.data[i]) + } + } + for i := 0; i < len(s.overflow); i++ { + d := float64(s.overflow[i]) - avg + varianceSum += d * d + } + std := math.Sqrt(varianceSum / float64(n)) + fmt.Printf("\nConnection Times (ms)\n") + fmt.Printf(" min avg max std\n") + fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10) + //printing percentiles + fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n") + percentiles := make([]int, len(percentages)) + for i := 0; i < len(percentages); i++ { + percentiles[i] = n * percentages[i] / 100 + } + percentiles[len(percentiles)-1] = n + percentileIndex := 0 + currentSum := 0 + for i := 0; i < len(s.data); i++ { + currentSum += s.data[i] + if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] { + fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0) + percentileIndex++ + for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] { + percentileIndex++ + } + } + } + sort.Ints(s.overflow) + for i := 0; i < len(s.overflow); i++ { + currentSum++ + if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] { + fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0) + percentileIndex++ + for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] { + percentileIndex++ + } + } + } +} + +// a fake reader to generate content to upload +type FakeReader struct { + id uint64 // an id number + size int64 // max bytes +} + +func (l *FakeReader) Read(p []byte) (n int, err error) { + if l.size <= 0 { + return 0, io.EOF + } + if int64(len(p)) > l.size { + n = int(l.size) + } else { + n = len(p) + } + if n >= 8 { + for i := 0; i < 8; i++ { + p[i] = byte(l.id >> uint(i*8)) + } + } + l.size -= int64(n) + return +} + +func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) { + size := int(l.size) + bufferSize := len(sharedBytes) + for size > 0 { + tempBuffer := sharedBytes + if size < bufferSize { + tempBuffer = sharedBytes[0:size] + } + count, e := w.Write(tempBuffer) + if e != nil { + return int64(size), e + } + size -= count + } + return l.size, nil +} + +func Readln(r *bufio.Reader) ([]byte, error) { + var ( + isPrefix = true + err error + line, ln []byte + ) + for isPrefix && err == nil { + line, isPrefix, err = r.ReadLine() + ln = append(ln, line...) + } + return ln, err +} diff --git a/weed/command/command.go b/weed/command/command.go new file mode 100644 index 000000000..d654f57cd --- /dev/null +++ b/weed/command/command.go @@ -0,0 +1,71 @@ +package command + +import ( + "flag" + "fmt" + "os" + "strings" +) + +var Commands = []*Command{ + cmdBenchmark, + cmdBackup, + cmdCompact, + cmdFix, + cmdServer, + cmdMaster, + cmdFiler, + cmdUpload, + cmdDownload, + cmdShell, + cmdVersion, + cmdVolume, + cmdExport, + cmdMount, +} + +type Command struct { + // Run runs the command. + // The args are the arguments after the command name. + Run func(cmd *Command, args []string) bool + + // UsageLine is the one-line usage message. + // The first word in the line is taken to be the command name. + UsageLine string + + // Short is the short description shown in the 'go help' output. + Short string + + // Long is the long message shown in the 'go help <this-command>' output. + Long string + + // Flag is a set of flags specific to this command. + Flag flag.FlagSet + + IsDebug *bool +} + +// Name returns the command's name: the first word in the usage line. +func (c *Command) Name() string { + name := c.UsageLine + i := strings.Index(name, " ") + if i >= 0 { + name = name[:i] + } + return name +} + +func (c *Command) Usage() { + fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine) + fmt.Fprintf(os.Stderr, "Default Usage:\n") + c.Flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "Description:\n") + fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long)) + os.Exit(2) +} + +// Runnable reports whether the command can be run; otherwise +// it is a documentation pseudo-command such as importpath. +func (c *Command) Runnable() bool { + return c.Run != nil +} diff --git a/weed/command/compact.go b/weed/command/compact.go new file mode 100644 index 000000000..ba2fbf867 --- /dev/null +++ b/weed/command/compact.go @@ -0,0 +1,45 @@ +package command + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func init() { + cmdCompact.Run = runCompact // break init cycle +} + +var cmdCompact = &Command{ + UsageLine: "compact -dir=/tmp -volumeId=234", + Short: "run weed tool compact on volume file", + Long: `Force an compaction to remove deleted files from volume files. + The compacted .dat file is stored as .cpd file. + The compacted .idx file is stored as .cpx file. + + `, +} + +var ( + compactVolumePath = cmdCompact.Flag.String("dir", ".", "data directory to store files") + compactVolumeCollection = cmdCompact.Flag.String("collection", "", "volume collection name") + compactVolumeId = cmdCompact.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir.") +) + +func runCompact(cmd *Command, args []string) bool { + + if *compactVolumeId == -1 { + return false + } + + vid := storage.VolumeId(*compactVolumeId) + v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, + storage.NeedleMapInMemory, nil, nil) + if err != nil { + glog.Fatalf("Load Volume [ERROR] %s\n", err) + } + if err = v.Compact(); err != nil { + glog.Fatalf("Compact Volume [ERROR] %s\n", err) + } + + return true +} diff --git a/weed/command/download.go b/weed/command/download.go new file mode 100644 index 000000000..39ed2b38e --- /dev/null +++ b/weed/command/download.go @@ -0,0 +1,130 @@ +package command + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strings" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + d DownloadOptions +) + +type DownloadOptions struct { + server *string + dir *string +} + +func init() { + cmdDownload.Run = runDownload // break init cycle + d.server = cmdDownload.Flag.String("server", "localhost:9333", "SeaweedFS master location") + d.dir = cmdDownload.Flag.String("dir", ".", "Download the whole folder recursively if specified.") +} + +var cmdDownload = &Command{ + UsageLine: "download -server=localhost:9333 -dir=one_directory fid1 [fid2 fid3 ...]", + Short: "download files by file id", + Long: `download files by file id. + + Usually you just need to use curl to lookup the file's volume server, and then download them directly. + This download tool combine the two steps into one. + + What's more, if you use "weed upload -maxMB=..." option to upload a big file divided into chunks, you can + use this tool to download the chunks and merge them automatically. + + `, +} + +func runDownload(cmd *Command, args []string) bool { + for _, fid := range args { + if e := downloadToFile(*d.server, fid, *d.dir); e != nil { + fmt.Println("Download Error: ", fid, e) + } + } + return true +} + +func downloadToFile(server, fileId, saveDir string) error { + fileUrl, lookupError := operation.LookupFileId(server, fileId) + if lookupError != nil { + return lookupError + } + filename, rc, err := util.DownloadUrl(fileUrl) + if err != nil { + return err + } + defer rc.Close() + if filename == "" { + filename = fileId + } + isFileList := false + if strings.HasSuffix(filename, "-list") { + // old command compatible + isFileList = true + filename = filename[0 : len(filename)-len("-list")] + } + f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) + if err != nil { + return err + } + defer f.Close() + if isFileList { + content, err := ioutil.ReadAll(rc) + if err != nil { + return err + } + fids := strings.Split(string(content), "\n") + for _, partId := range fids { + var n int + _, part, err := fetchContent(*d.server, partId) + if err == nil { + n, err = f.Write(part) + } + if err == nil && n < len(part) { + err = io.ErrShortWrite + } + if err != nil { + return err + } + } + } else { + if _, err = io.Copy(f, rc); err != nil { + return err + } + + } + return nil +} + +func fetchContent(server string, fileId string) (filename string, content []byte, e error) { + fileUrl, lookupError := operation.LookupFileId(server, fileId) + if lookupError != nil { + return "", nil, lookupError + } + var rc io.ReadCloser + if filename, rc, e = util.DownloadUrl(fileUrl); e != nil { + return "", nil, e + } + content, e = ioutil.ReadAll(rc) + rc.Close() + return +} + +func WriteFile(filename string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + n, err := f.Write(data) + f.Close() + if err == nil && n < len(data) { + err = io.ErrShortWrite + } + return err +} diff --git a/weed/command/export.go b/weed/command/export.go new file mode 100644 index 000000000..481aa111b --- /dev/null +++ b/weed/command/export.go @@ -0,0 +1,213 @@ +package command + +import ( + "archive/tar" + "bytes" + "fmt" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "text/template" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +const ( + defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}` + timeFormat = "2006-01-02T15:04:05" +) + +var ( + export ExportOptions +) + +type ExportOptions struct { + dir *string + collection *string + volumeId *int +} + +var cmdExport = &Command{ + UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}} -newer='" + timeFormat + "'", + Short: "list or export files from one volume data file", + Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified. + + The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{.Key}}. + + `, +} + +func init() { + cmdExport.Run = runExport // break init cycle + export.dir = cmdExport.Flag.String("dir", ".", "input data directory to store volume data files") + export.collection = cmdExport.Flag.String("collection", "", "the volume collection name") + export.volumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.") +} + +var ( + output = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout") + format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename formatted with {{.Mime}} {{.Id}} {{.Name}} {{.Ext}}") + newer = cmdExport.Flag.String("newer", "", "export only files newer than this time, default is all files. Must be specified in RFC3339 without timezone") + + tarOutputFile *tar.Writer + tarHeader tar.Header + fileNameTemplate *template.Template + fileNameTemplateBuffer = bytes.NewBuffer(nil) + newerThan time.Time + newerThanUnix int64 = -1 + localLocation, _ = time.LoadLocation("Local") +) + +func runExport(cmd *Command, args []string) bool { + + var err error + + if *newer != "" { + if newerThan, err = time.ParseInLocation(timeFormat, *newer, localLocation); err != nil { + fmt.Println("cannot parse 'newer' argument: " + err.Error()) + return false + } + newerThanUnix = newerThan.Unix() + } + + if *export.volumeId == -1 { + return false + } + + if *output != "" { + if *output != "-" && !strings.HasSuffix(*output, ".tar") { + fmt.Println("the output file", *output, "should be '-' or end with .tar") + return false + } + + if fileNameTemplate, err = template.New("name").Parse(*format); err != nil { + fmt.Println("cannot parse format " + *format + ": " + err.Error()) + return false + } + + var outputFile *os.File + if *output == "-" { + outputFile = os.Stdout + } else { + if outputFile, err = os.Create(*output); err != nil { + glog.Fatalf("cannot open output tar %s: %s", *output, err) + } + } + defer outputFile.Close() + tarOutputFile = tar.NewWriter(outputFile) + defer tarOutputFile.Close() + t := time.Now() + tarHeader = tar.Header{Mode: 0644, + ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(), + Typeflag: tar.TypeReg, + AccessTime: t, ChangeTime: t} + } + + fileName := strconv.Itoa(*export.volumeId) + if *export.collection != "" { + fileName = *export.collection + "_" + fileName + } + vid := storage.VolumeId(*export.volumeId) + indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644) + if err != nil { + glog.Fatalf("Create Volume Index [ERROR] %s\n", err) + } + defer indexFile.Close() + + needleMap, err := storage.LoadNeedleMap(indexFile) + if err != nil { + glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err) + } + + var version storage.Version + + err = storage.ScanVolumeFile(*export.dir, *export.collection, vid, + storage.NeedleMapInMemory, + func(superBlock storage.SuperBlock) error { + version = superBlock.Version() + return nil + }, true, func(n *storage.Needle, offset int64) error { + nv, ok := needleMap.Get(n.Id) + glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v", + n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped(), ok, nv) + if ok && nv.Size > 0 && int64(nv.Offset)*8 == offset { + if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) { + glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d", + n.LastModified, newerThanUnix) + return nil + } + return walker(vid, n, version) + } + if !ok { + glog.V(2).Infof("This seems deleted %d size %d", n.Id, n.Size) + } else { + glog.V(2).Infof("Skipping later-updated Id %d size %d", n.Id, n.Size) + } + return nil + }) + if err != nil { + glog.Fatalf("Export Volume File [ERROR] %s\n", err) + } + return true +} + +type nameParams struct { + Name string + Id uint64 + Mime string + Key string + Ext string +} + +func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) { + key := storage.NewFileIdFromNeedle(vid, n).String() + if tarOutputFile != nil { + fileNameTemplateBuffer.Reset() + if err = fileNameTemplate.Execute(fileNameTemplateBuffer, + nameParams{ + Name: string(n.Name), + Id: n.Id, + Mime: string(n.Mime), + Key: key, + Ext: filepath.Ext(string(n.Name)), + }, + ); err != nil { + return err + } + + fileName := fileNameTemplateBuffer.String() + + if n.IsGzipped() && path.Ext(fileName) != ".gz" { + fileName = fileName + ".gz" + } + + tarHeader.Name, tarHeader.Size = fileName, int64(len(n.Data)) + if n.HasLastModifiedDate() { + tarHeader.ModTime = time.Unix(int64(n.LastModified), 0) + } else { + tarHeader.ModTime = time.Unix(0, 0) + } + tarHeader.ChangeTime = tarHeader.ModTime + if err = tarOutputFile.WriteHeader(&tarHeader); err != nil { + return err + } + _, err = tarOutputFile.Write(n.Data) + } else { + size := n.DataSize + if version == storage.Version1 { + size = n.Size + } + fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n", + key, + n.Name, + size, + n.IsGzipped(), + n.Mime, + ) + } + return +} diff --git a/weed/command/filer.go b/weed/command/filer.go new file mode 100644 index 000000000..582d4e9c8 --- /dev/null +++ b/weed/command/filer.go @@ -0,0 +1,105 @@ +package command + +import ( + "net/http" + "os" + "strconv" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + f FilerOptions +) + +type FilerOptions struct { + master *string + ip *string + port *int + collection *string + defaultReplicaPlacement *string + dir *string + redirectOnRead *bool + disableDirListing *bool + secretKey *string + cassandra_server *string + cassandra_keyspace *string + redis_server *string + redis_password *string + redis_database *int +} + +func init() { + cmdFiler.Run = runFiler // break init cycle + f.master = cmdFiler.Flag.String("master", "localhost:9333", "master server location") + f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") + f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address") + f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") + f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data") + f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") + f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") + f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") + f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") + f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") + f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") + f.redis_password = cmdFiler.Flag.String("redis.password", "", "password in clear text") + f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") + f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + +} + +var cmdFiler = &Command{ + UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>", + Short: "start a file server that points to a master server", + Long: `start a file server which accepts REST operation for any files. + + //create or overwrite the file, the directories /path/to will be automatically created + POST /path/to/file + //get the file content + GET /path/to/file + //create or overwrite the file, the filename in the multipart request will be used + POST /path/to/ + //return a json format subdirectory and files listing + GET /path/to/ + + Current <fullpath~fileid> mapping metadata store is local embedded leveldb. + It should be highly scalable to hundreds of millions of files on a modest machine. + + Future we will ensure it can avoid of being SPOF. + + `, +} + +func runFiler(cmd *Command, args []string) bool { + + if err := util.TestFolderWritable(*f.dir); err != nil { + glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err) + } + + r := http.NewServeMux() + _, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection, + *f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing, + *f.secretKey, + *f.cassandra_server, *f.cassandra_keyspace, + *f.redis_server, *f.redis_password, *f.redis_database, + ) + if nfs_err != nil { + glog.Fatalf("Filer startup error: %v", nfs_err) + } + glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*f.port)) + filerListener, e := util.NewListener( + ":"+strconv.Itoa(*f.port), + time.Duration(10)*time.Second, + ) + if e != nil { + glog.Fatalf("Filer listener error: %v", e) + } + if e := http.Serve(filerListener, r); e != nil { + glog.Fatalf("Filer Fail to serve: %v", e) + } + + return true +} diff --git a/weed/command/fix.go b/weed/command/fix.go new file mode 100644 index 000000000..2ec74d026 --- /dev/null +++ b/weed/command/fix.go @@ -0,0 +1,70 @@ +package command + +import ( + "os" + "path" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func init() { + cmdFix.Run = runFix // break init cycle +} + +var cmdFix = &Command{ + UsageLine: "fix -dir=/tmp -volumeId=234", + Short: "run weed tool fix on index file if corrupted", + Long: `Fix runs the SeaweedFS fix command to re-create the index .idx file. + + `, +} + +var ( + fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files") + fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name") + fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") +) + +func runFix(cmd *Command, args []string) bool { + + if *fixVolumeId == -1 { + return false + } + + fileName := strconv.Itoa(*fixVolumeId) + if *fixVolumeCollection != "" { + fileName = *fixVolumeCollection + "_" + fileName + } + indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + glog.Fatalf("Create Volume Index [ERROR] %s\n", err) + } + defer indexFile.Close() + + nm := storage.NewNeedleMap(indexFile) + defer nm.Close() + + vid := storage.VolumeId(*fixVolumeId) + err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, + storage.NeedleMapInMemory, + func(superBlock storage.SuperBlock) error { + return nil + }, false, func(n *storage.Needle, offset int64) error { + glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(), n.IsGzipped()) + if n.Size > 0 { + pe := nm.Put(n.Id, uint32(offset/storage.NeedlePaddingSize), n.Size) + glog.V(2).Infof("saved %d with error %v", n.Size, pe) + } else { + glog.V(2).Infof("skipping deleted file ...") + return nm.Delete(n.Id) + } + return nil + }) + if err != nil { + glog.Fatalf("Export Volume File [ERROR] %s\n", err) + } + + return true +} diff --git a/weed/command/master.go b/weed/command/master.go new file mode 100644 index 000000000..aed8fc793 --- /dev/null +++ b/weed/command/master.go @@ -0,0 +1,91 @@ +package command + +import ( + "net/http" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gorilla/mux" +) + +func init() { + cmdMaster.Run = runMaster // break init cycle +} + +var cmdMaster = &Command{ + UsageLine: "master -port=9333", + Short: "start a master server", + Long: `start a master server to provide volume=>location mapping service + and sequence number of file ids + + `, +} + +var ( + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + masterIp = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address") + masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") + masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094") + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "Deprecating! xml configuration file") + defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.") + mTimeout = cmdMaster.Flag.Int("idleTimeout", 10, "connection idle seconds") + mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") + masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + + masterWhiteList []string +) + +func runMaster(cmd *Command, args []string) bool { + if *mMaxCpu < 1 { + *mMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*mMaxCpu) + if err := util.TestFolderWritable(*metaFolder); err != nil { + glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err) + } + if *masterWhiteListOption != "" { + masterWhiteList = strings.Split(*masterWhiteListOption, ",") + } + + r := mux.NewRouter() + ms := weed_server.NewMasterServer(r, *mport, *metaFolder, + *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, + masterWhiteList, *masterSecureKey, + ) + + listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport) + + glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress) + + listener, e := util.NewListener(listeningAddress, time.Duration(*mTimeout)*time.Second) + if e != nil { + glog.Fatalf("Master startup error: %v", e) + } + + go func() { + time.Sleep(100 * time.Millisecond) + myMasterAddress := *masterIp + ":" + strconv.Itoa(*mport) + var peers []string + if *masterPeers != "" { + peers = strings.Split(*masterPeers, ",") + } + raftServer := weed_server.NewRaftServer(r, peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse) + ms.SetRaftServer(raftServer) + }() + + if e := http.Serve(listener, r); e != nil { + glog.Fatalf("Fail to serve: %v", e) + } + return true +} diff --git a/weed/command/mount.go b/weed/command/mount.go new file mode 100644 index 000000000..d6e87d76c --- /dev/null +++ b/weed/command/mount.go @@ -0,0 +1,35 @@ +package command + +type MountOptions struct { + filer *string + dir *string +} + +var ( + mountOptions MountOptions +) + +func init() { + cmdMount.Run = runMount // break init cycle + cmdMount.IsDebug = cmdMount.Flag.Bool("debug", false, "verbose debug information") + mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") + mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") +} + +var cmdMount = &Command{ + UsageLine: "mount -filer=localhost:8888 -dir=/some/dir", + Short: "mount weed filer to a directory as file system in userspace(FUSE)", + Long: `mount weed filer to userspace. + + Pre-requisites: + 1) have SeaweedFS master and volume servers running + 2) have a "weed filer" running + These 2 requirements can be achieved with one command "weed server -filer=true" + + This uses bazil.org/fuse, whichenables writing FUSE file systems on + Linux, and OS X. + + On OS X, it requires OSXFUSE (http://osxfuse.github.com/). + + `, +} diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go new file mode 100644 index 000000000..3bf22ddc4 --- /dev/null +++ b/weed/command/mount_notsupported.go @@ -0,0 +1,15 @@ +// +build !linux +// +build !darwin + +package command + +import ( + "fmt" + "runtime" +) + +func runMount(cmd *Command, args []string) bool { + fmt.Printf("Mount is not supported on %s %s\n", runtime.GOOS, runtime.GOARCH) + + return true +} diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go new file mode 100644 index 000000000..b086d8cbf --- /dev/null +++ b/weed/command/mount_std.go @@ -0,0 +1,106 @@ +// +build linux darwin + +package command + +import ( + "fmt" + "runtime" + + "bazil.org/fuse" + "bazil.org/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" + "golang.org/x/net/context" +) + +func runMount(cmd *Command, args []string) bool { + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + if *mountOptions.dir == "" { + fmt.Printf("Please specify the mount directory via \"-dir\"") + return false + } + + c, err := fuse.Mount(*mountOptions.dir) + if err != nil { + glog.Fatal(err) + return false + } + + OnInterrupt(func() { + fuse.Unmount(*mountOptions.dir) + c.Close() + }) + + err = fs.Serve(c, WFS{}) + if err != nil { + fuse.Unmount(*mountOptions.dir) + } + + // check if the mount process has an error to report + <-c.Ready + if err := c.MountError; err != nil { + glog.Fatal(err) + } + + return true +} + +type File struct { + FileId filer.FileId + Name string +} + +func (File) Attr(context context.Context, attr *fuse.Attr) error { + return nil +} +func (File) ReadAll(ctx context.Context) ([]byte, error) { + return []byte("hello, world\n"), nil +} + +type Dir struct { + Path string + Id uint64 +} + +func (dir Dir) Attr(context context.Context, attr *fuse.Attr) error { + return nil +} + +func (dir Dir) Lookup(ctx context.Context, name string) (fs.Node, error) { + files_result, e := filer.ListFiles(*mountOptions.filer, dir.Path, name) + if e != nil { + return nil, fuse.ENOENT + } + if len(files_result.Files) > 0 { + return File{files_result.Files[0].Id, files_result.Files[0].Name}, nil + } + return nil, fmt.Errorf("File Not Found for %s", name) +} + +type WFS struct{} + +func (WFS) Root() (fs.Node, error) { + return Dir{}, nil +} + +func (dir *Dir) ReadDir(ctx context.Context) ([]fuse.Dirent, error) { + var ret []fuse.Dirent + if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil { + for _, d := range dirs.Directories { + dirId := uint64(d.Id) + ret = append(ret, fuse.Dirent{Inode: dirId, Name: d.Name, Type: fuse.DT_Dir}) + } + } + if files, e := filer.ListFiles(*mountOptions.filer, dir.Path, ""); e == nil { + for _, f := range files.Files { + if fileId, e := storage.ParseFileId(string(f.Id)); e == nil { + fileInode := uint64(fileId.VolumeId)<<48 + fileId.Key + ret = append(ret, fuse.Dirent{Inode: fileInode, Name: f.Name, Type: fuse.DT_File}) + } + + } + } + return ret, nil +} diff --git a/weed/command/server.go b/weed/command/server.go new file mode 100644 index 000000000..6ed1e5228 --- /dev/null +++ b/weed/command/server.go @@ -0,0 +1,291 @@ +package command + +import ( + "net/http" + "os" + "runtime" + "runtime/pprof" + "strconv" + "strings" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gorilla/mux" +) + +type ServerOptions struct { + cpuprofile *string +} + +var ( + serverOptions ServerOptions + filerOptions FilerOptions +) + +func init() { + cmdServer.Run = runServer // break init cycle +} + +var cmdServer = &Command{ + UsageLine: "server -port=8080 -dir=/tmp -volume.max=5 -ip=server_name", + Short: "start a server, including volume server, and automatically elect a master server", + Long: `start both a volume server to provide storage spaces + and a master server to provide volume=>location mapping service and sequence number of file ids + + This is provided as a convenient way to start both volume server and master server. + The servers are exactly the same as starting them separately. + + So other volume servers can use this embedded master server also. + + Optionally, one filer server can be started. Logically, filer servers should not be in a cluster. + They run with meta data on disk, not shared. So each filer server is different. + + `, +} + +var ( + serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name") + serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + serverTimeout = cmdServer.Flag.Int("idleTimeout", 10, "connection idle seconds") + serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name") + serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name") + serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") + serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list") + serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") + masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port") + masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified") + masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.") + masterConfFile = cmdServer.Flag.String("master.conf", "/etc/weedfs/weedfs.conf", "xml configuration file") + masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.") + volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") + volumePublicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") + volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") + volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...") + volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") + volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") + volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") + + serverWhiteList []string +) + +func init() { + serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") + filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") + filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") + filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") + filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified") + filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") + filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") + filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") + filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") + filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") + filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") + filerOptions.redis_password = cmdServer.Flag.String("filer.redis.password", "", "redis password in clear text") + filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") +} + +func runServer(cmd *Command, args []string) bool { + filerOptions.secretKey = serverSecureKey + if *serverOptions.cpuprofile != "" { + f, err := os.Create(*serverOptions.cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + if *filerOptions.redirectOnRead { + *isStartingFiler = true + } + + *filerOptions.master = *serverIp + ":" + strconv.Itoa(*masterPort) + + if *filerOptions.defaultReplicaPlacement == "" { + *filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement + } + + if *volumePublicPort == 0 { + *volumePublicPort = *volumePort + } + + if *serverMaxCpu < 1 { + *serverMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*serverMaxCpu) + + folders := strings.Split(*volumeDataFolders, ",") + maxCountStrings := strings.Split(*volumeMaxDataVolumeCounts, ",") + var maxCounts []int + for _, maxString := range maxCountStrings { + if max, e := strconv.Atoi(maxString); e == nil { + maxCounts = append(maxCounts, max) + } else { + glog.Fatalf("The max specified in -max not a valid number %s", maxString) + } + } + if len(folders) != len(maxCounts) { + glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts)) + } + for _, folder := range folders { + if err := util.TestFolderWritable(folder); err != nil { + glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) + } + } + + if *masterMetaFolder == "" { + *masterMetaFolder = folders[0] + } + if *isStartingFiler { + if *filerOptions.dir == "" { + *filerOptions.dir = *masterMetaFolder + "/filer" + os.MkdirAll(*filerOptions.dir, 0700) + } + if err := util.TestFolderWritable(*filerOptions.dir); err != nil { + glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err) + } + } + if err := util.TestFolderWritable(*masterMetaFolder); err != nil { + glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) + } + + if *serverWhiteListOption != "" { + serverWhiteList = strings.Split(*serverWhiteListOption, ",") + } + + if *isStartingFiler { + go func() { + r := http.NewServeMux() + _, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection, + *filerOptions.defaultReplicaPlacement, + *filerOptions.redirectOnRead, *filerOptions.disableDirListing, + *filerOptions.secretKey, + *filerOptions.cassandra_server, *filerOptions.cassandra_keyspace, + *filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database, + ) + if nfs_err != nil { + glog.Fatalf("Filer startup error: %v", nfs_err) + } + glog.V(0).Infoln("Start Seaweed Filer", util.VERSION, "at port", strconv.Itoa(*filerOptions.port)) + filerListener, e := util.NewListener( + ":"+strconv.Itoa(*filerOptions.port), + time.Duration(10)*time.Second, + ) + if e != nil { + glog.Fatalf("Filer listener error: %v", e) + } + if e := http.Serve(filerListener, r); e != nil { + glog.Fatalf("Filer Fail to serve: %v", e) + } + }() + } + + var raftWaitForMaster sync.WaitGroup + var volumeWait sync.WaitGroup + + raftWaitForMaster.Add(1) + volumeWait.Add(1) + + go func() { + r := mux.NewRouter() + ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder, + *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *serverGarbageThreshold, + serverWhiteList, *serverSecureKey, + ) + + glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*masterPort)) + masterListener, e := util.NewListener(*serverBindIp+":"+strconv.Itoa(*masterPort), time.Duration(*serverTimeout)*time.Second) + if e != nil { + glog.Fatalf("Master startup error: %v", e) + } + + go func() { + raftWaitForMaster.Wait() + time.Sleep(100 * time.Millisecond) + myAddress := *serverIp + ":" + strconv.Itoa(*masterPort) + var peers []string + if *serverPeers != "" { + peers = strings.Split(*serverPeers, ",") + } + raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse) + ms.SetRaftServer(raftServer) + volumeWait.Done() + }() + + raftWaitForMaster.Done() + if e := http.Serve(masterListener, r); e != nil { + glog.Fatalf("Master Fail to serve:%s", e.Error()) + } + }() + + volumeWait.Wait() + time.Sleep(100 * time.Millisecond) + if *volumePublicPort == 0 { + *volumePublicPort = *volumePort + } + if *volumeServerPublicUrl == "" { + *volumeServerPublicUrl = *serverIp + ":" + strconv.Itoa(*volumePublicPort) + } + isSeperatedPublicPort := *volumePublicPort != *volumePort + volumeMux := http.NewServeMux() + publicVolumeMux := volumeMux + if isSeperatedPublicPort { + publicVolumeMux = http.NewServeMux() + } + volumeNeedleMapKind := storage.NeedleMapInMemory + switch *volumeIndexType { + case "leveldb": + volumeNeedleMapKind = storage.NeedleMapLevelDb + case "boltdb": + volumeNeedleMapKind = storage.NeedleMapBoltDb + } + volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, + *serverIp, *volumePort, *volumeServerPublicUrl, + folders, maxCounts, + volumeNeedleMapKind, + *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, + serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, + ) + + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort)) + volumeListener, eListen := util.NewListener( + *serverBindIp+":"+strconv.Itoa(*volumePort), + time.Duration(*serverTimeout)*time.Second, + ) + if eListen != nil { + glog.Fatalf("Volume server listener error: %v", eListen) + } + if isSeperatedPublicPort { + publicListeningAddress := *serverIp + ":" + strconv.Itoa(*volumePublicPort) + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) + publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*serverTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + go func() { + if e := http.Serve(publicListener, publicVolumeMux); e != nil { + glog.Fatalf("Volume server fail to serve public: %v", e) + } + }() + } + + OnInterrupt(func() { + volumeServer.Shutdown() + pprof.StopCPUProfile() + }) + + if e := http.Serve(volumeListener, volumeMux); e != nil { + glog.Fatalf("Volume server fail to serve:%v", e) + } + + return true +} diff --git a/weed/command/shell.go b/weed/command/shell.go new file mode 100644 index 000000000..19c5049c5 --- /dev/null +++ b/weed/command/shell.go @@ -0,0 +1,61 @@ +package command + +import ( + "bufio" + "fmt" + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func init() { + cmdShell.Run = runShell // break init cycle +} + +var cmdShell = &Command{ + UsageLine: "shell", + Short: "run interactive commands, now just echo", + Long: `run interactive commands. + + `, +} + +var () + +func runShell(command *Command, args []string) bool { + r := bufio.NewReader(os.Stdin) + o := bufio.NewWriter(os.Stdout) + e := bufio.NewWriter(os.Stderr) + prompt := func() { + var err error + if _, err = o.WriteString("> "); err != nil { + glog.V(0).Infoln("error writing to stdout:", err) + } + if err = o.Flush(); err != nil { + glog.V(0).Infoln("error flushing stdout:", err) + } + } + readLine := func() string { + ret, err := r.ReadString('\n') + if err != nil { + fmt.Fprint(e, err) + os.Exit(1) + } + return ret + } + execCmd := func(cmd string) int { + if cmd != "" { + if _, err := o.WriteString(cmd); err != nil { + glog.V(0).Infoln("error writing to stdout:", err) + } + } + return 0 + } + + cmd := "" + for { + prompt() + cmd = readLine() + execCmd(cmd) + } +} diff --git a/weed/command/signal_handling.go b/weed/command/signal_handling.go new file mode 100644 index 000000000..182e2754d --- /dev/null +++ b/weed/command/signal_handling.go @@ -0,0 +1,31 @@ +// +build !plan9 + +package command + +import ( + "os" + "os/signal" + "syscall" +) + +func OnInterrupt(fn func()) { + // deal with control+c,etc + signalChan := make(chan os.Signal, 1) + // controlling terminal close, daemon not exit + signal.Ignore(syscall.SIGHUP) + signal.Notify(signalChan, + os.Interrupt, + os.Kill, + syscall.SIGALRM, + // syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + // syscall.SIGQUIT, + ) + go func() { + for _ = range signalChan { + fn() + os.Exit(0) + } + }() +} diff --git a/weed/command/signal_handling_notsupported.go b/weed/command/signal_handling_notsupported.go new file mode 100644 index 000000000..dfcc24a3e --- /dev/null +++ b/weed/command/signal_handling_notsupported.go @@ -0,0 +1,6 @@ +// +build plan9 + +package command + +func OnInterrupt(fn func()) { +} diff --git a/weed/command/upload.go b/weed/command/upload.go new file mode 100644 index 000000000..0dfa115bb --- /dev/null +++ b/weed/command/upload.go @@ -0,0 +1,108 @@ +package command + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" +) + +var ( + upload UploadOptions +) + +type UploadOptions struct { + server *string + dir *string + include *string + replication *string + collection *string + ttl *string + maxMB *int + secretKey *string +} + +func init() { + cmdUpload.Run = runUpload // break init cycle + cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information") + upload.server = cmdUpload.Flag.String("server", "localhost:9333", "SeaweedFS master location") + upload.dir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.") + upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") + upload.replication = cmdUpload.Flag.String("replication", "", "replication type") + upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name") + upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") + upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit") + upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") +} + +var cmdUpload = &Command{ + UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf", + Short: "upload one or a list of files", + Long: `upload one or a list of files, or batch upload one whole folder recursively. + + If uploading a list of files: + It uses consecutive file keys for the list of files. + e.g. If the file1 uses key k, file2 can be read via k_1 + + If uploading a whole folder recursively: + All files under the folder and subfolders will be uploaded, each with its own file key. + Optional parameter "-include" allows you to specify the file name patterns. + + If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is. + This can save volume server's gzipped processing and allow customizable gzip compression level. + The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js". + + If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly. + The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned. + + `, +} + +func runUpload(cmd *Command, args []string) bool { + secret := security.Secret(*upload.secretKey) + if len(cmdUpload.Flag.Args()) == 0 { + if *upload.dir == "" { + return false + } + filepath.Walk(*upload.dir, func(path string, info os.FileInfo, err error) error { + if err == nil { + if !info.IsDir() { + if *upload.include != "" { + if ok, _ := filepath.Match(*upload.include, filepath.Base(path)); !ok { + return nil + } + } + parts, e := operation.NewFileParts([]string{path}) + if e != nil { + return e + } + results, e := operation.SubmitFiles(*upload.server, parts, + *upload.replication, *upload.collection, + *upload.ttl, *upload.maxMB, secret) + bytes, _ := json.Marshal(results) + fmt.Println(string(bytes)) + if e != nil { + return e + } + } + } else { + fmt.Println(err) + } + return err + }) + } else { + parts, e := operation.NewFileParts(args) + if e != nil { + fmt.Println(e.Error()) + } + results, _ := operation.SubmitFiles(*upload.server, parts, + *upload.replication, *upload.collection, + *upload.ttl, *upload.maxMB, secret) + bytes, _ := json.Marshal(results) + fmt.Println(string(bytes)) + } + return true +} diff --git a/weed/command/version.go b/weed/command/version.go new file mode 100644 index 000000000..8fdd68ec8 --- /dev/null +++ b/weed/command/version.go @@ -0,0 +1,24 @@ +package command + +import ( + "fmt" + "runtime" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +var cmdVersion = &Command{ + Run: runVersion, + UsageLine: "version", + Short: "print SeaweedFS version", + Long: `Version prints the SeaweedFS version`, +} + +func runVersion(cmd *Command, args []string) bool { + if len(args) != 0 { + cmd.Usage() + } + + fmt.Printf("version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + return true +} diff --git a/weed/command/volume.go b/weed/command/volume.go new file mode 100644 index 000000000..21369cbe9 --- /dev/null +++ b/weed/command/volume.go @@ -0,0 +1,165 @@ +package command + +import ( + "net/http" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + v VolumeServerOptions +) + +type VolumeServerOptions struct { + port *int + publicPort *int + folders []string + folderMaxLimits []int + ip *string + publicUrl *string + bindIp *string + master *string + pulseSeconds *int + idleConnectionTimeout *int + maxCpu *int + dataCenter *string + rack *string + whiteList []string + indexType *string + fixJpgOrientation *bool + readRedirect *bool +} + +func init() { + cmdVolume.Run = runVolume // break init cycle + v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") + v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public") + v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") + v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") + v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") + v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") + v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 10, "connection idle seconds") + v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") + v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") + v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") + v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") + v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") +} + +var cmdVolume = &Command{ + UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333", + Short: "start a volume server", + Long: `start a volume server to provide storage spaces + + `, +} + +var ( + volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") + maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...") + volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") +) + +func runVolume(cmd *Command, args []string) bool { + if *v.maxCpu < 1 { + *v.maxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*v.maxCpu) + + //Set multiple folders and each folder's max volume count limit' + v.folders = strings.Split(*volumeFolders, ",") + maxCountStrings := strings.Split(*maxVolumeCounts, ",") + for _, maxString := range maxCountStrings { + if max, e := strconv.Atoi(maxString); e == nil { + v.folderMaxLimits = append(v.folderMaxLimits, max) + } else { + glog.Fatalf("The max specified in -max not a valid number %s", maxString) + } + } + if len(v.folders) != len(v.folderMaxLimits) { + glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) + } + for _, folder := range v.folders { + if err := util.TestFolderWritable(folder); err != nil { + glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) + } + } + + //security related white list configuration + if *volumeWhiteListOption != "" { + v.whiteList = strings.Split(*volumeWhiteListOption, ",") + } + + if *v.ip == "" { + *v.ip = "127.0.0.1" + } + + if *v.publicPort == 0 { + *v.publicPort = *v.port + } + if *v.publicUrl == "" { + *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) + } + isSeperatedPublicPort := *v.publicPort != *v.port + + volumeMux := http.NewServeMux() + publicVolumeMux := volumeMux + if isSeperatedPublicPort { + publicVolumeMux = http.NewServeMux() + } + + volumeNeedleMapKind := storage.NeedleMapInMemory + switch *v.indexType { + case "leveldb": + volumeNeedleMapKind = storage.NeedleMapLevelDb + case "boltdb": + volumeNeedleMapKind = storage.NeedleMapBoltDb + } + volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, + *v.ip, *v.port, *v.publicUrl, + v.folders, v.folderMaxLimits, + volumeNeedleMapKind, + *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, + v.whiteList, + *v.fixJpgOrientation, *v.readRedirect, + ) + + listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", listeningAddress) + listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + if isSeperatedPublicPort { + publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) + glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) + publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) + if e != nil { + glog.Fatalf("Volume server listener error:%v", e) + } + go func() { + if e := http.Serve(publicListener, publicVolumeMux); e != nil { + glog.Fatalf("Volume server fail to serve public: %v", e) + } + }() + } + + OnInterrupt(func() { + volumeServer.Shutdown() + }) + + if e := http.Serve(listener, volumeMux); e != nil { + glog.Fatalf("Volume server fail to serve: %v", e) + } + return true +} diff --git a/weed/command/volume_test.go b/weed/command/volume_test.go new file mode 100644 index 000000000..7399f1248 --- /dev/null +++ b/weed/command/volume_test.go @@ -0,0 +1,13 @@ +package command + +import ( + "net/http" + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func TestXYZ(t *testing.T) { + glog.V(0).Infoln("Last-Modified", time.Unix(int64(1373273596), 0).UTC().Format(http.TimeFormat)) +} |
