aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go12
-rw-r--r--weed/command/benchmark.go62
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/compact.go5
-rw-r--r--weed/command/download.go1
-rw-r--r--weed/command/export.go24
-rw-r--r--weed/command/filer.go16
-rw-r--r--weed/command/filer_copy.go179
-rw-r--r--weed/command/filer_replication.go17
-rw-r--r--weed/command/fix.go29
-rw-r--r--weed/command/master.go14
-rw-r--r--weed/command/mount.go54
-rw-r--r--weed/command/mount_darwin.go4
-rw-r--r--weed/command/mount_freebsd.go4
-rw-r--r--weed/command/mount_linux.go146
-rw-r--r--weed/command/mount_std.go86
-rw-r--r--weed/command/msg_broker.go107
-rw-r--r--weed/command/s3.go114
-rw-r--r--weed/command/scaffold.go78
-rw-r--r--weed/command/scaffold_test.go44
-rw-r--r--weed/command/server.go9
-rw-r--r--weed/command/shell.go3
-rw-r--r--weed/command/upload.go6
-rw-r--r--weed/command/volume.go158
-rw-r--r--weed/command/webdav.go44
25 files changed, 927 insertions, 290 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index cef2bbe3a..eb2b5ba4a 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -5,8 +5,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -64,7 +64,7 @@ var cmdBackup = &Command{
func runBackup(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if *s.volumeId == -1 {
return false
@@ -98,15 +98,15 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
}
- var replication *storage.ReplicaPlacement
+ var replication *super_block.ReplicaPlacement
if *s.replication != "" {
- replication, err = storage.NewReplicaPlacementFromString(*s.replication)
+ replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
if err != nil {
fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
return true
}
} else {
- replication, err = storage.NewReplicaPlacementFromString(stats.Replication)
+ replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
if err != nil {
fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
return true
@@ -119,7 +119,7 @@ func runBackup(cmd *Command, args []string) bool {
}
if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
- if err = v.Compact(0, 0); err != nil {
+ if err = v.Compact2(30 * 1024 * 1024 * 1024); err != nil {
fmt.Printf("Compact Volume before synchronizing %v\n", err)
return true
}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 26be1fe3a..e85ab1b9b 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -15,11 +15,11 @@ import (
"sync"
"time"
- "github.com/spf13/viper"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
@@ -41,6 +41,7 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
+ grpcRead *bool
}
var (
@@ -65,6 +66,7 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
sharedBytes = make([]byte, 1024)
}
@@ -109,7 +111,7 @@ var (
func runBenchmark(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
@@ -125,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", 0, strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
@@ -279,23 +281,61 @@ func readFiles(fileIdLineChan chan string, s *stat) {
fmt.Printf("reading file %s\n", fid)
}
start := time.Now()
- url, err := b.masterClient.LookupFileId(fid)
- if err != nil {
- s.failed++
- println("!!!! ", fid, " location not found!!!!!")
- continue
+ var bytesRead int
+ var err error
+ if *b.grpcRead {
+ volumeServer, err := b.masterClient.LookupVolumeServer(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
+ } else {
+ url, err := b.masterClient.LookupFileId(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ var bytes []byte
+ bytes, err = util.Get(url)
+ bytesRead = len(bytes)
}
- if bytesRead, err := util.Get(url); err == nil {
+ if err == nil {
s.completed++
- s.transferred += int64(len(bytesRead))
+ s.transferred += int64(bytesRead)
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- fmt.Printf("Failed to read %s error:%v\n", url, err)
+ fmt.Printf("Failed to read %s error:%v\n", fid, err)
}
}
}
+func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
+ err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid})
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, respErr := fileGetClient.Recv()
+ if resp != nil {
+ bytesRead += len(resp.Data)
+ }
+ if respErr != nil {
+ if respErr == io.EOF {
+ return nil
+ }
+ return respErr
+ }
+ }
+ })
+ return
+}
+
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
diff --git a/weed/command/command.go b/weed/command/command.go
index 79c00d4cd..9dc51e922 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -20,6 +20,7 @@ var Commands = []*Command{
cmdS3,
cmdUpload,
cmdDownload,
+ cmdMsgBroker,
cmdScaffold,
cmdShell,
cmdVersion,
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 4a54f5670..85313b749 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -17,6 +17,9 @@ var cmdCompact = &Command{
The compacted .dat file is stored as .cpd file.
The compacted .idx file is stored as .cpx file.
+ For method=0, it compacts based on the .dat file, works if .idx file is corrupted.
+ For method=1, it compacts based on the .idx file, works if deletion happened but not written to .dat files.
+
`,
}
@@ -47,7 +50,7 @@ func runCompact(cmd *Command, args []string) bool {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
} else {
- if err = v.Compact2(); err != nil {
+ if err = v.Compact2(preallocate); err != nil {
glog.Fatalf("Compact Volume [ERROR] %s\n", err)
}
}
diff --git a/weed/command/download.go b/weed/command/download.go
index b3e33defd..be0eb47e5 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -71,6 +71,7 @@ func downloadToFile(server, fileId, saveDir string) error {
}
f, err := os.OpenFile(path.Join(saveDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
+ io.Copy(ioutil.Discard, rc)
return err
}
defer f.Close()
diff --git a/weed/command/export.go b/weed/command/export.go
index d3a765e09..8c32b3f4d 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"fmt"
+ "io"
"os"
"path"
"path/filepath"
@@ -12,11 +13,11 @@ import (
"text/template"
"time"
- "io"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -89,12 +90,12 @@ func printNeedle(vid needle.VolumeId, n *needle.Needle, version needle.Version,
type VolumeFileScanner4Export struct {
version needle.Version
counter int
- needleMap *storage.NeedleMap
+ needleMap *needle_map.MemDb
vid needle.VolumeId
}
-func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Export) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -192,15 +193,12 @@ func runExport(cmd *Command, args []string) bool {
fileName = *export.collection + "_" + fileName
}
vid := needle.VolumeId(*export.volumeId)
- indexFile, err := os.OpenFile(path.Join(*export.dir, fileName+".idx"), os.O_RDONLY, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- needleMap, err := storage.LoadBtreeNeedleMap(indexFile)
- if err != nil {
- glog.Fatalf("cannot load needle map from %s: %s", indexFile.Name(), err)
+ needleMap := needle_map.NewMemDb()
+ defer needleMap.Close()
+
+ if err := needleMap.LoadFromIdx(path.Join(*export.dir, fileName+".idx")); err != nil {
+ glog.Fatalf("cannot load needle map from %s.idx: %s", fileName, err)
}
volumeFileScanner := &VolumeFileScanner4Export{
diff --git a/weed/command/filer.go b/weed/command/filer.go
index b1ceb46f5..fb1ee2b0f 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -6,14 +6,14 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
+ "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc/reflection"
)
var (
@@ -27,13 +27,13 @@ type FilerOptions struct {
publicPort *int
collection *string
defaultReplicaPlacement *string
- redirectOnRead *bool
disableDirListing *bool
maxMB *int
dirListingLimit *int
dataCenter *string
enableNotification *bool
disableHttp *bool
+ cipher *bool
// default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
@@ -47,12 +47,12 @@ func init() {
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
- f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to write to volumes in this data center")
f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
+ f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
}
var cmdFiler = &Command{
@@ -103,14 +103,14 @@ func (fo *FilerOptions) startFiler() {
Masters: strings.Split(*fo.masters, ","),
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
- RedirectOnRead: *fo.redirectOnRead,
DisableDirListing: *fo.disableDirListing,
MaxMB: *fo.maxMB,
DirListingLimit: *fo.dirListingLimit,
DataCenter: *fo.dataCenter,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
- Port: *fo.port,
+ Port: uint32(*fo.port),
+ Cipher: *fo.cipher,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
@@ -145,7 +145,7 @@ func (fo *FilerOptions) startFiler() {
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "filer"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
reflection.Register(grpcS)
go grpcS.Serve(grpcL)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 9995cf6aa..0aee8cd80 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -14,13 +14,15 @@ import (
"sync"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
)
var (
@@ -37,9 +39,10 @@ type CopyOptions struct {
masterClient *wdclient.MasterClient
concurrenctFiles *int
concurrenctChunks *int
- compressionLevel *int
grpcDialOption grpc.DialOption
masters []string
+ cipher bool
+ ttlSec int32
}
func init() {
@@ -52,7 +55,6 @@ func init() {
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
- copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
}
var cmdCopy = &Command{
@@ -105,11 +107,9 @@ func runCopy(cmd *Command, args []string) bool {
filerGrpcPort := filerPort + 10000
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
- copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- ctx := context.Background()
-
- masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress)
+ masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
if err != nil {
fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
return false
@@ -124,10 +124,14 @@ func runCopy(cmd *Command, args []string) bool {
*copy.maxMB = int(maxMB)
}
copy.masters = masters
+ copy.cipher = cipher
- copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
- go copy.masterClient.KeepConnectedToMaster()
- copy.masterClient.WaitUntilConnected()
+ ttl, err := needle.ReadTTL(*copy.ttl)
+ if err != nil {
+ fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
+ return false
+ }
+ copy.ttlSec = int32(ttl.Minutes()) * 60
if *cmdCopy.IsDebug {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
@@ -153,7 +157,7 @@ func runCopy(cmd *Command, args []string) bool {
filerHost: filerUrl.Host,
filerGrpcAddress: filerGrpcAddress,
}
- if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
+ if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
return
}
@@ -164,13 +168,14 @@ func runCopy(cmd *Command, args []string) bool {
return true
}
-func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
- err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
+ cipher = resp.Cipher
return nil
})
return
@@ -215,9 +220,9 @@ type FileCopyWorker struct {
filerGrpcAddress string
}
-func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
+func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
for task := range fileCopyTaskChan {
- if err := worker.doEachCopy(ctx, task); err != nil {
+ if err := worker.doEachCopy(task); err != nil {
return err
}
}
@@ -233,7 +238,7 @@ type FileCopyTask struct {
gid uint32
}
-func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
+func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
f, err := os.Open(task.sourceLocation)
if err != nil {
@@ -261,36 +266,55 @@ func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask)
}
if chunkCount == 1 {
- return worker.uploadFileAsOne(ctx, task, f)
+ return worker.uploadFileAsOne(task, f)
}
- return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
+ return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
-func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
+func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
+ data, err := ioutil.ReadAll(f)
+ if err != nil {
+ return err
+ }
var chunks []*filer_pb.FileChunk
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
- uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
+ uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
@@ -300,17 +324,19 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: 0,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: assignResult.FileId,
+ Offset: 0,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.Md5,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
})
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -325,13 +351,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
Mime: mimeType,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -342,7 +368,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
return nil
}
-func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
+func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
@@ -352,6 +378,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
var wg sync.WaitGroup
var uploadError error
+ var collection, replication string
fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
@@ -363,22 +390,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
<-concurrentChunks
}()
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ if collection == "" {
+ collection = assignResult.Collection
+ }
+ if replication == "" {
+ replication = assignResult.Replication
+ }
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.NewSectionReader(f, i*chunkSize, chunkSize),
- false, "application/octet-stream", nil, assignResult.Auth)
+ uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -388,11 +435,13 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
return
}
chunksChan <- &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: assignResult.FileId,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
}
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
@@ -410,11 +459,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
+ operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
return uploadError
}
- if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -427,15 +476,15 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ Replication: replication,
+ Collection: collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -457,18 +506,12 @@ func detectMimeType(f *os.File) string {
}
if err != nil {
fmt.Printf("read head of %v: %v\n", f.Name(), err)
- return "application/octet-stream"
+ return ""
}
f.Seek(0, io.SeekStart)
mimeType := http.DetectContentType(head[:n])
+ if mimeType == "application/octet-stream" {
+ return ""
+ }
return mimeType
}
-
-func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(clientConn)
- return fn(client)
- }, filerAddress, grpcDialOption)
-
-}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index c6e7f5dba..737f0d24a 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -39,7 +39,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
util.LoadConfiguration("notification", true)
- config := viper.GetViper()
+ config := util.GetViper()
var notificationInput sub.NotificationInput
@@ -47,8 +47,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
- viperSub := config.Sub("notification." + input.GetName())
- if err := input.Initialize(viperSub); err != nil {
+ if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize notification input for %s: %+v",
input.GetName(), err)
}
@@ -66,10 +65,9 @@ func runFilerReplicate(cmd *Command, args []string) bool {
// avoid recursive replication
if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
- sourceConfig, sinkConfig := config.Sub("source.filer"), config.Sub("sink.filer")
- if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
- fromDir := sourceConfig.GetString("directory")
- toDir := sinkConfig.GetString("directory")
+ if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
+ fromDir := config.GetString("source.filer.directory")
+ toDir := config.GetString("sink.filer.directory")
if strings.HasPrefix(toDir, fromDir) {
glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
}
@@ -79,8 +77,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
var dataSink sink.ReplicationSink
for _, sk := range sink.Sinks {
if config.GetBool("sink." + sk.GetName() + ".enabled") {
- viperSub := config.Sub("sink." + sk.GetName())
- if err := sk.Initialize(viperSub); err != nil {
+ if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
glog.Fatalf("Failed to initialize sink for %s: %+v",
sk.GetName(), err)
}
@@ -98,7 +95,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
return true
}
- replicator := replication.NewReplicator(config.Sub("source.filer"), dataSink)
+ replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for {
key, m, err := notificationInput.ReceiveMessage()
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 2fbbca5e6..90d1c4893 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -8,6 +8,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -31,11 +33,11 @@ var (
type VolumeFileScanner4Fix struct {
version needle.Version
- nm *storage.NeedleMap
+ nm *needle_map.MemDb
}
-func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
@@ -46,11 +48,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
- pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)
+ pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
glog.V(2).Infof("saved %d with error %v", n.Size, pe)
} else {
glog.V(2).Infof("skipping deleted file ...")
- return scanner.nm.Delete(n.Id, types.ToOffset(offset))
+ return scanner.nm.Delete(n.Id)
}
return nil
}
@@ -66,13 +68,8 @@ func runFix(cmd *Command, args []string) bool {
baseFileName = *fixVolumeCollection + "_" + baseFileName
}
indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
- indexFile, err := os.OpenFile(indexFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- nm := storage.NewBtreeNeedleMap(indexFile)
+ nm := needle_map.NewMemDb()
defer nm.Close()
vid := needle.VolumeId(*fixVolumeId)
@@ -80,9 +77,13 @@ func runFix(cmd *Command, args []string) bool {
nm: nm,
}
- err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner)
- if err != nil {
- glog.Fatalf("Export Volume File [ERROR] %s\n", err)
+ if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
+ glog.Fatalf("scan .dat File: %v", err)
+ os.Remove(indexFileName)
+ }
+
+ if err := nm.SaveToIdx(indexFileName); err != nil {
+ glog.Fatalf("save to .idx File: %v", err)
os.Remove(indexFileName)
}
diff --git a/weed/command/master.go b/weed/command/master.go
index 3d33f4f7a..1be60426f 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -8,14 +8,16 @@ import (
"strings"
"github.com/chrislusf/raft/protobuf"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc/reflection"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
- "github.com/spf13/viper"
- "google.golang.org/grpc/reflection"
)
var (
@@ -101,6 +103,8 @@ func runMaster(cmd *Command, args []string) bool {
func startMaster(masterOption MasterOptions, masterWhiteList []string) {
+ backend.LoadConfiguration(util.GetViper())
+
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
r := mux.NewRouter()
@@ -112,7 +116,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("Master startup error: %v", e)
}
// start raftServer
- raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
+ raftServer := weed_server.NewRaftServer(security.LoadClientTLS(util.GetViper(), "grpc.master"),
peers, myMasterAddress, *masterOption.metaFolder, ms.Topo, *masterOption.pulseSeconds)
if raftServer == nil {
glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717", *masterOption.metaFolder)
@@ -126,7 +130,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
// Create your protocol servers.
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
master_pb.RegisterSeaweedServer(grpcS, ms)
protobuf.RegisterRaftServer(grpcS, raftServer)
reflection.Register(grpcS)
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 71c1a4387..f1448c6cc 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,23 +1,18 @@
package command
-import (
- "fmt"
- "strconv"
- "strings"
-)
-
type MountOptions struct {
- filer *string
- filerMountRootPath *string
- dir *string
- dirListingLimit *int
- collection *string
- replication *string
- ttlSec *int
- chunkSizeLimitMB *int
- dataCenter *string
- allowOthers *bool
- umaskString *string
+ filer *string
+ filerMountRootPath *string
+ dir *string
+ dirListCacheLimit *int64
+ collection *string
+ replication *string
+ ttlSec *int
+ chunkSizeLimitMB *int
+ dataCenter *string
+ allowOthers *bool
+ umaskString *string
+ outsideContainerClusterMode *bool
}
var (
@@ -31,7 +26,7 @@ func init() {
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.filerMountRootPath = cmdMount.Flag.String("filer.path", "/", "mount this remote path from filer server")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
- mountOptions.dirListingLimit = cmdMount.Flag.Int("dirListLimit", 100000, "limit directory listing size")
+ mountOptions.dirListCacheLimit = cmdMount.Flag.Int64("dirListCacheLimit", 1000000, "limit cache size to speed up directory long format listing")
mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
@@ -41,6 +36,7 @@ func init() {
mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111")
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
+ mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
}
var cmdMount = &Command{
@@ -58,21 +54,11 @@ var cmdMount = &Command{
On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
- `,
-}
-
-func parseFilerGrpcAddress(filer string) (filerGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(filer, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("The filer should have hostname:port format: %v", hostnameAndPort)
- }
-
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("The filer filer port parse error: %v", parseErr)
- }
+ If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose,
+ the volume servers are not accessible by their own ip addresses.
+ In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming:
+ * All volume server containers are accessible through the same hostname or IP address as the filer.
+ * All volume server container ports are open external to the cluster.
- filerGrpcPort := int(filerPort) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
+ `,
}
diff --git a/weed/command/mount_darwin.go b/weed/command/mount_darwin.go
index 632691e47..f0a5581e7 100644
--- a/weed/command/mount_darwin.go
+++ b/weed/command/mount_darwin.go
@@ -7,3 +7,7 @@ import (
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{}
}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_freebsd.go b/weed/command/mount_freebsd.go
index 632691e47..f0a5581e7 100644
--- a/weed/command/mount_freebsd.go
+++ b/weed/command/mount_freebsd.go
@@ -7,3 +7,7 @@ import (
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{}
}
+
+func checkMountPointAvailable(dir string) bool {
+ return true
+}
diff --git a/weed/command/mount_linux.go b/weed/command/mount_linux.go
index 7d94e5142..80a5f9da4 100644
--- a/weed/command/mount_linux.go
+++ b/weed/command/mount_linux.go
@@ -1,11 +1,157 @@
package command
import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+
"github.com/seaweedfs/fuse"
)
+const (
+ /* 36 35 98:0 /mnt1 /mnt2 rw,noatime master:1 - ext3 /dev/root rw,errors=continue
+ (1)(2)(3) (4) (5) (6) (7) (8) (9) (10) (11)
+
+ (1) mount ID: unique identifier of the mount (may be reused after umount)
+ (2) parent ID: ID of parent (or of self for the top of the mount tree)
+ (3) major:minor: value of st_dev for files on filesystem
+ (4) root: root of the mount within the filesystem
+ (5) mount point: mount point relative to the process's root
+ (6) mount options: per mount options
+ (7) optional fields: zero or more fields of the form "tag[:value]"
+ (8) separator: marks the end of the optional fields
+ (9) filesystem type: name of filesystem of the form "type[.subtype]"
+ (10) mount source: filesystem specific information or "none"
+ (11) super options: per super block options*/
+ mountinfoFormat = "%d %d %d:%d %s %s %s %s"
+)
+
+// Info reveals information about a particular mounted filesystem. This
+// struct is populated from the content in the /proc/<pid>/mountinfo file.
+type Info struct {
+ // ID is a unique identifier of the mount (may be reused after umount).
+ ID int
+
+ // Parent indicates the ID of the mount parent (or of self for the top of the
+ // mount tree).
+ Parent int
+
+ // Major indicates one half of the device ID which identifies the device class.
+ Major int
+
+ // Minor indicates one half of the device ID which identifies a specific
+ // instance of device.
+ Minor int
+
+ // Root of the mount within the filesystem.
+ Root string
+
+ // Mountpoint indicates the mount point relative to the process's root.
+ Mountpoint string
+
+ // Opts represents mount-specific options.
+ Opts string
+
+ // Optional represents optional fields.
+ Optional string
+
+ // Fstype indicates the type of filesystem, such as EXT3.
+ Fstype string
+
+ // Source indicates filesystem specific information or "none".
+ Source string
+
+ // VfsOpts represents per super block options.
+ VfsOpts string
+}
+
+// Mounted determines if a specified mountpoint has been mounted.
+// On Linux it looks at /proc/self/mountinfo and on Solaris at mnttab.
+func mounted(mountPoint string) (bool, error) {
+ entries, err := parseMountTable()
+ if err != nil {
+ return false, err
+ }
+
+ // Search the table for the mountPoint
+ for _, e := range entries {
+ if e.Mountpoint == mountPoint {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+// Parse /proc/self/mountinfo because comparing Dev and ino does not work from
+// bind mounts
+func parseMountTable() ([]*Info, error) {
+ f, err := os.Open("/proc/self/mountinfo")
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ return parseInfoFile(f)
+}
+
+func parseInfoFile(r io.Reader) ([]*Info, error) {
+ var (
+ s = bufio.NewScanner(r)
+ out []*Info
+ )
+
+ for s.Scan() {
+ if err := s.Err(); err != nil {
+ return nil, err
+ }
+
+ var (
+ p = &Info{}
+ text = s.Text()
+ optionalFields string
+ )
+
+ if _, err := fmt.Sscanf(text, mountinfoFormat,
+ &p.ID, &p.Parent, &p.Major, &p.Minor,
+ &p.Root, &p.Mountpoint, &p.Opts, &optionalFields); err != nil {
+ return nil, fmt.Errorf("Scanning '%s' failed: %s", text, err)
+ }
+ // Safe as mountinfo encodes mountpoints with spaces as \040.
+ index := strings.Index(text, " - ")
+ postSeparatorFields := strings.Fields(text[index+3:])
+ if len(postSeparatorFields) < 3 {
+ return nil, fmt.Errorf("Error found less than 3 fields post '-' in %q", text)
+ }
+
+ if optionalFields != "-" {
+ p.Optional = optionalFields
+ }
+
+ p.Fstype = postSeparatorFields[0]
+ p.Source = postSeparatorFields[1]
+ p.VfsOpts = strings.Join(postSeparatorFields[2:], " ")
+ out = append(out, p)
+ }
+ return out, nil
+}
+
func osSpecificMountOptions() []fuse.MountOption {
return []fuse.MountOption{
fuse.AllowNonEmptyMount(),
}
}
+
+func checkMountPointAvailable(dir string) bool {
+ mountPoint := dir
+ if mountPoint != "/" && strings.HasSuffix(mountPoint, "/") {
+ mountPoint = mountPoint[0 : len(mountPoint)-1]
+ }
+
+ if mounted, err := mounted(mountPoint); err != nil || mounted {
+ return false
+ }
+
+ return true
+}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 6ca9bfdca..9177091a5 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -3,6 +3,7 @@
package command
import (
+ "context"
"fmt"
"os"
"os/user"
@@ -12,12 +13,13 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/jacobsa/daemonize"
- "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -43,13 +45,14 @@ func runMount(cmd *Command, args []string) bool {
*mountOptions.chunkSizeLimitMB,
*mountOptions.allowOthers,
*mountOptions.ttlSec,
- *mountOptions.dirListingLimit,
+ *mountOptions.dirListCacheLimit,
os.FileMode(umask),
+ *mountOptions.outsideContainerClusterMode,
)
}
func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int,
- allowOthers bool, ttlSec int, dirListingLimit int, umask os.FileMode) bool {
+ allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode, outsideContainerClusterMode bool) bool {
util.LoadConfiguration("security", false)
@@ -88,13 +91,19 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
}
}
+ // Ensure target mount point availability
+ if isValid := checkMountPointAvailable(dir); !isValid {
+ glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
+ return false
+ }
+
mountName := path.Base(dir)
options := []fuse.MountOption{
fuse.VolumeName(mountName),
- fuse.FSName("SeaweedFS"),
- fuse.Subtype("SeaweedFS"),
- fuse.NoAppleDouble(),
+ fuse.FSName(filer + ":" + filerMountRootPath),
+ fuse.Subtype("seaweedfs"),
+ // fuse.NoAppleDouble(), // include .DS_Store, otherwise can not delete non-empty folders
fuse.NoAppleXattr(),
fuse.NoBrowse(),
fuse.AutoXattr(),
@@ -116,9 +125,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
c, err := fuse.Mount(dir, options...)
if err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount: %v", err)
daemonize.SignalOutcome(err)
- return false
+ return true
}
util.OnInterrupt(func() {
@@ -126,13 +135,31 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
c.Close()
})
- filerGrpcAddress, err := parseFilerGrpcAddress(filer)
+ // parse filer grpc address
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(filer)
if err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
daemonize.SignalOutcome(err)
+ return true
+ }
+
+ // try to connect to filer, filerBucketsPath may be useful later
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ var cipher bool
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.Fatal(err)
return false
}
+ // find mount point
mountRoot := filerMountRootPath
if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
mountRoot = mountRoot[0 : len(mountRoot)-1]
@@ -141,22 +168,24 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
daemonize.SignalOutcome(nil)
err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{
- FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
- FilerMountRootPath: mountRoot,
- Collection: collection,
- Replication: replication,
- TtlSec: int32(ttlSec),
- ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
- DataCenter: dataCenter,
- DirListingLimit: dirListingLimit,
- EntryCacheTtl: 3 * time.Second,
- MountUid: uid,
- MountGid: gid,
- MountMode: mountMode,
- MountCtime: fileInfo.ModTime(),
- MountMtime: time.Now(),
- Umask: umask,
+ FilerGrpcAddress: filerGrpcAddress,
+ GrpcDialOption: grpcDialOption,
+ FilerMountRootPath: mountRoot,
+ Collection: collection,
+ Replication: replication,
+ TtlSec: int32(ttlSec),
+ ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
+ DataCenter: dataCenter,
+ DirListCacheLimit: dirListCacheLimit,
+ EntryCacheTtl: 3 * time.Second,
+ MountUid: uid,
+ MountGid: gid,
+ MountMode: mountMode,
+ MountCtime: fileInfo.ModTime(),
+ MountMtime: time.Now(),
+ Umask: umask,
+ OutsideContainerClusterMode: outsideContainerClusterMode,
+ Cipher: cipher,
}))
if err != nil {
fuse.Unmount(dir)
@@ -165,8 +194,9 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
- glog.Fatal(err)
+ glog.V(0).Infof("mount process: %v", err)
daemonize.SignalOutcome(err)
+ return true
}
return true
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
new file mode 100644
index 000000000..3e13b4730
--- /dev/null
+++ b/weed/command/msg_broker.go
@@ -0,0 +1,107 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ messageBrokerStandaloneOptions QueueOptions
+)
+
+type QueueOptions struct {
+ filer *string
+ port *int
+ defaultTtl *string
+}
+
+func init() {
+ cmdMsgBroker.Run = runMsgBroker // break init cycle
+ messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
+ messageBrokerStandaloneOptions.defaultTtl = cmdMsgBroker.Flag.String("ttl", "1h", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+}
+
+var cmdMsgBroker = &Command{
+ UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
+ Short: "<WIP> start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMsgBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return messageBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
+
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*msgBrokerOpt.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ filerQueuesPath := "/queues"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerQueuesPath = resp.DirQueues
+ glog.V(0).Infof("Queue read filer queues dir: %s", filerQueuesPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
+ qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{
+ Filers: []string{*msgBrokerOpt.filer},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Port: *msgBrokerOpt.port,
+ })
+
+ // start grpc listener
+ grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ queue_pb.RegisterSeaweedQueueServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index e004bb066..cd4018fbc 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -1,18 +1,20 @@
package command
import (
+ "context"
+ "fmt"
"net/http"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
- "fmt"
+ "github.com/gorilla/mux"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
)
var (
@@ -20,29 +22,89 @@ var (
)
type S3Options struct {
- filer *string
- filerBucketsPath *string
- port *int
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
}
func init() {
cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
- s3StandaloneOptions.filerBucketsPath = cmdS3.Flag.String("filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name, {bucket}.{domainName}")
+ s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
}
var cmdS3 = &Command{
- UsageLine: "s3 -port=8333 -filer=<ip:port>",
+ UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
Short: "start a s3 API compatible server that is backed by a filer",
Long: `start a s3 API compatible server that is backed by a filer.
+ By default, you can use any access key and secret key to access the S3 APIs.
+ To enable credential based access, create a config.json file similar to this:
+
+{
+ "identities": [
+ {
+ "name": "some_name",
+ "credentials": [
+ {
+ "accessKey": "some_access_key1",
+ "secretKey": "some_secret_key1"
+ }
+ ],
+ "actions": [
+ "Admin",
+ "Read",
+ "Write"
+ ]
+ },
+ {
+ "name": "some_read_only_user",
+ "credentials": [
+ {
+ "accessKey": "some_access_key2",
+ "secretKey": "some_secret_key2"
+ }
+ ],
+ "actions": [
+ "Read"
+ ]
+ },
+ {
+ "name": "some_normal_user",
+ "credentials": [
+ {
+ "accessKey": "some_access_key3",
+ "secretKey": "some_secret_key3"
+ }
+ ],
+ "actions": [
+ "Read",
+ "Write"
+ ]
+ },
+ {
+ "name": "user_limited_to_bucket1",
+ "credentials": [
+ {
+ "accessKey": "some_access_key4",
+ "secretKey": "some_secret_key4"
+ }
+ ],
+ "actions": [
+ "Read:bucket1",
+ "Write:bucket1"
+ ]
+ }
+ ]
+}
+
`,
}
@@ -56,20 +118,44 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*s3opt.filer)
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*s3opt.filer)
if err != nil {
glog.Fatal(err)
return false
}
+ filerBucketsPath := "/buckets"
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ filerBucketsPath = resp.DirBuckets
+ glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ break
+ }
+ }
+
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
Filer: *s3opt.filer,
FilerGrpcAddress: filerGrpcAddress,
+ Config: *s3opt.config,
DomainName: *s3opt.domainName,
- BucketsPath: *s3opt.filerBucketsPath,
- GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
+ BucketsPath: filerBucketsPath,
+ GrpcDialOption: grpcDialOption,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 13091764e..f4a08fb51 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -14,6 +14,14 @@ var cmdScaffold = &Command{
Short: "generate basic configuration files",
Long: `Generate filer.toml with all possible configurations for you to customize.
+ The options can also be overwritten by environment variables.
+ For example, the filer.toml mysql password can be overwritten by environment variable
+ export WEED_MYSQL_PASSWORD=some_password
+ Environment variable rules:
+ * Prefix fix with "WEED_"
+ * Upppercase the reset of variable name.
+ * Replace '.' with '_'
+
`,
}
@@ -59,14 +67,21 @@ const (
# $HOME/.seaweedfs/filer.toml
# /etc/seaweedfs/filer.toml
-[memory]
-# local in memory, mostly for testing purpose
-enabled = false
+####################################################
+# Customizable filer server options
+####################################################
+[filer.options]
+# with http DELETE, by default the filer would check whether a folder is empty.
+# recursive_delete will delete all sub folders and files, similar to "rm -Rf"
+recursive_delete = false
+# directories under this folder will be automatically creating a separate bucket
+buckets_folder = "/buckets"
+# directories under this folder will be store message queue data
+queues_folder = "/queues"
-[leveldb]
-# local on disk, mostly for simple single-machine setup, fairly scalable
-enabled = false
-dir = "." # directory to store level db files
+####################################################
+# The following are filer store options
+####################################################
[leveldb2]
# local on disk, mostly for simple single-machine setup, fairly scalable
@@ -74,10 +89,6 @@ dir = "." # directory to store level db files
enabled = true
dir = "." # directory to store level db files
-####################################################
-# multiple filers on shared storage, fairly scalable
-####################################################
-
[mysql] # or tidb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
@@ -95,6 +106,7 @@ password = ""
database = "" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
+interpolateParams = false
[postgres] # or cockroachdb
# CREATE TABLE IF NOT EXISTS filemeta (
@@ -144,6 +156,10 @@ addresses = [
"localhost:30006",
]
password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = true
+# automatically use the closest Redis server for reads
+routeByLatency = true
[etcd]
enabled = false
@@ -310,6 +326,10 @@ key = ""
cert = ""
key = ""
+[grpc.msg_broker]
+cert = ""
+key = ""
+
# use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
[grpc.client]
@@ -350,19 +370,33 @@ sleep_minutes = 17 # sleep minutes between each script execution
default_filer_url = "http://localhost:8888/"
[master.sequencer]
-type = memory # Choose [memory|etcd] type for storing the file id sequence
+type = "memory" # Choose [memory|etcd] type for storing the file id sequence
# when sequencer.type = etcd, set listen client urls of etcd cluster that store file id sequence
# example : http://127.0.0.1:2379,http://127.0.0.1:2389
-sequencer_etcd_urls = http://127.0.0.1:2379
-
-
-[storage.backend.s3]
-enabled = true
-aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-region = "us-east-2"
-bucket = "your_bucket_name" # an existing bucket
-directory = "/" # destination directory
+sequencer_etcd_urls = "http://127.0.0.1:2379"
+
+
+# configurations for tiered cloud storage
+# old volumes are transparently moved to cloud for cost efficiency
+[storage.backend]
+ [storage.backend.s3.default]
+ enabled = false
+ aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ region = "us-east-2"
+ bucket = "your_bucket_name" # an existing bucket
+
+# create this number of logical volumes if no more writable volumes
+# count_x means how many copies of data.
+# e.g.:
+# 000 has only one copy, count_1
+# 010 and 001 has two copies, count_2
+# 011 has only 3 copies, count_3
+[master.volume_growth]
+count_1 = 7 # create 1 x 7 = 7 actual volumes
+count_2 = 6 # create 2 x 6 = 12 actual volumes
+count_3 = 3 # create 3 x 3 = 9 actual volumes
+count_other = 1 # create n x 1 = n actual volumes
`
)
diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go
new file mode 100644
index 000000000..423dacc32
--- /dev/null
+++ b/weed/command/scaffold_test.go
@@ -0,0 +1,44 @@
+package command
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/spf13/viper"
+)
+
+func TestReadingTomlConfiguration(t *testing.T) {
+
+ viper.SetConfigType("toml")
+
+ // any approach to require this configuration into your program.
+ var tomlExample = []byte(`
+[database]
+server = "192.168.1.1"
+ports = [ 8001, 8001, 8002 ]
+connection_max = 5000
+enabled = true
+
+[servers]
+
+ # You can indent as you please. Tabs or spaces. TOML don't care.
+ [servers.alpha]
+ ip = "10.0.0.1"
+ dc = "eqdc10"
+
+ [servers.beta]
+ ip = "10.0.0.2"
+ dc = "eqdc10"
+
+`)
+
+ viper.ReadConfig(bytes.NewBuffer(tomlExample))
+
+ fmt.Printf("database is %v\n", viper.Get("database"))
+ fmt.Printf("servers is %v\n", viper.GetStringMap("servers"))
+
+ alpha := viper.Sub("servers.alpha")
+
+ fmt.Printf("alpha ip is %v\n", alpha.GetString("ip"))
+}
diff --git a/weed/command/server.go b/weed/command/server.go
index 87f404ed3..560b90037 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -78,10 +78,10 @@ func init() {
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
- filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
+ filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -89,13 +89,14 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
+ serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
- s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}")
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
+ s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
}
@@ -113,10 +114,6 @@ func runServer(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- if *filerOptions.redirectOnRead {
- *isStartingFiler = true
- }
-
if *isStartingS3 {
*isStartingFiler = true
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 34b5aef31..dcf70608f 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -6,7 +6,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
var (
@@ -31,7 +30,7 @@ var cmdShell = &Command{
func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
- shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
var filerPwdErr error
shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, filerPwdErr = util.ParseFilerUrl(*shellInitialFilerUrl)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 25e938d9b..d71046131 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -6,11 +6,9 @@ import (
"os"
"path/filepath"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
-
- "github.com/chrislusf/seaweedfs/weed/operation"
)
var (
@@ -63,7 +61,7 @@ var cmdUpload = &Command{
func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
if len(args) == 0 {
if *upload.dir == "" {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 3c1aa2b50..4773d8a55 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"os"
"runtime"
@@ -9,15 +10,20 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/spf13/viper"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util/httpdown"
+
+ "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc/reflection"
)
var (
@@ -44,6 +50,7 @@ type VolumeServerOptions struct {
cpuProfile *string
memProfile *string
compactionMBPerSecond *int
+ fileSizeLimitMB *int
}
func init() {
@@ -64,6 +71,7 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
+ v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
}
var cmdVolume = &Command{
@@ -94,7 +102,7 @@ func runVolume(cmd *Command, args []string) bool {
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
- //Set multiple folders and each folder's max volume count limit'
+ // Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
@@ -113,7 +121,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
- //security related white list configuration
+ // security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
}
@@ -128,11 +136,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if *v.publicUrl == "" {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
}
- isSeperatedPublicPort := *v.publicPort != *v.port
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
- if isSeperatedPublicPort {
+ if v.isSeparatedPublicPort() {
publicVolumeMux = http.NewServeMux()
}
@@ -156,53 +163,134 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
v.whiteList,
*v.fixJpgOrientation, *v.readRedirect,
*v.compactionMBPerSecond,
+ *v.fileSizeLimitMB,
)
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
- glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
- listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
- }
- if isSeperatedPublicPort {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
+ // starting grpc server
+ grpcS := v.startGrpcService(volumeServer)
+
+ // starting public http server
+ var publicHttpDown httpdown.Server
+ if v.isSeparatedPublicPort() {
+ publicHttpDown = v.startPublicHttpService(publicVolumeMux)
+ if nil == publicHttpDown {
+ glog.Fatalf("start public http service failed")
}
- go func() {
- if e := http.Serve(publicListener, publicVolumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve public: %v", e)
- }
- }()
}
+ // starting the cluster http server
+ clusterHttpServer := v.startClusterHttpService(volumeMux)
+
+ stopChain := make(chan struct{})
util.OnInterrupt(func() {
+ fmt.Println("volume server has be killed")
+ var startTime time.Time
+
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ startTime = time.Now()
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop public http server, elapsed %dms", delta)
+ }
+
+ startTime = time.Now()
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
+
+ startTime = time.Now()
+ grpcS.GracefulStop()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+
+ startTime = time.Now()
volumeServer.Shutdown()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+
pprof.StopCPUProfile()
+
+ close(stopChain) // notify exit
})
- // starting grpc server
+ select {
+ case <-stopChain:
+ }
+ glog.Warningf("the volume server exit.")
+}
+
+// check whether configure the public port
+func (v VolumeServerOptions) isSeparatedPublicPort() bool {
+ return *v.publicPort != *v.port
+}
+
+func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
grpcPort := *v.port + 10000
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
- grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume"))
- volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.volume"))
+ volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
-
- if viper.GetString("https.volume.key") != "" {
- if e := http.ServeTLS(listener, volumeMux,
- viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ go func() {
+ if err := grpcS.Serve(grpcL); err != nil {
+ glog.Fatalf("start gRPC service failed, %s", err)
}
- } else {
- if e := http.Serve(listener, volumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ }()
+ return grpcS
+}
+
+func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
+ publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
+ publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute}
+ publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener)
+ go func() {
+ if err := publicHttpDown.Wait(); err != nil {
+ glog.Errorf("public http down wait failed, %v", err)
}
+ }()
+
+ return publicHttpDown
+}
+
+func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server {
+ var (
+ certFile, keyFile string
+ )
+ if viper.GetString("https.volume.key") != "" {
+ certFile = viper.GetString("https.volume.cert")
+ keyFile = viper.GetString("https.volume.key")
}
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
+ listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ httpDown := httpdown.HTTP{
+ KillTimeout: 5 * time.Minute,
+ StopTimeout: 5 * time.Minute,
+ CertFile: certFile,
+ KeyFile: keyFile}
+ clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ go func() {
+ if e := clusterHttpServer.Wait(); e != nil {
+ glog.Fatalf("Volume server fail to serve: %v", e)
+ }
+ }()
+ return clusterHttpServer
}
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 371c4a9ad..4f5d5f5ce 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"net/http"
"os/user"
@@ -8,10 +9,11 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
var (
@@ -37,7 +39,7 @@ func init() {
var cmdWebDav = &Command{
UsageLine: "webdav -port=7333 -filer=<ip:port>",
- Short: "<unstable> start a webdav server that is backed by a filer",
+ Short: "start a webdav server that is backed by a filer",
Long: `start a webdav server that is backed by a filer.
`,
@@ -55,12 +57,6 @@ func runWebDav(cmd *Command, args []string) bool {
func (wo *WebDavOption) startWebDav() bool {
- filerGrpcAddress, err := parseFilerGrpcAddress(*wo.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
-
// detect current user
uid, gid := uint32(0), uint32(0)
if u, err := user.Current(); err == nil {
@@ -72,13 +68,43 @@ func (wo *WebDavOption) startWebDav() bool {
}
}
+ // parse filer grpc address
+ filerGrpcAddress, err := pb.ParseFilerGrpcAddress(*wo.filer)
+ if err != nil {
+ glog.Fatal(err)
+ return false
+ }
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ var cipher bool
+ // connect to filer
+ for {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ break
+ }
+ }
+
ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{
Filer: *wo.filer,
FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "client"),
+ GrpcDialOption: grpcDialOption,
Collection: *wo.collection,
Uid: uid,
Gid: gid,
+ Cipher: cipher,
})
if webdavServer_err != nil {
glog.Fatalf("WebDav Server startup error: %v", webdavServer_err)