aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer.go48
-rw-r--r--weed/command/filer_copy.go220
-rw-r--r--weed/command/mount.go11
-rw-r--r--weed/command/mount_std.go9
-rw-r--r--weed/command/server.go21
-rw-r--r--weed/command/volume.go9
6 files changed, 236 insertions, 82 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a2929d0d3..1bd1493bd 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -2,17 +2,18 @@ package command
import (
"net/http"
- "os"
"strconv"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/soheilhy/cmux"
- "google.golang.org/grpc/reflection"
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/grpc/reflection"
+ "strings"
)
var (
@@ -20,50 +21,35 @@ var (
)
type FilerOptions struct {
- master *string
+ masters *string
ip *string
port *int
publicPort *int
collection *string
defaultReplicaPlacement *string
- dir *string
redirectOnRead *bool
disableDirListing *bool
- confFile *string
maxMB *int
secretKey *string
- cassandra_server *string
- cassandra_keyspace *string
- redis_server *string
- redis_password *string
- redis_database *int
}
func init() {
cmdFiler.Run = runFiler // break init cycle
- f.master = cmdFiler.Flag.String("master", "localhost:9333", "master server location")
+ f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection")
f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address")
f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public")
- f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data")
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
- f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file")
f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit")
- f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
- f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
- f.redis_server = cmdFiler.Flag.String("redis.server", "", "comma separated host:port[,host2:port2]* of the redis server, e.g., 127.0.0.1:6379")
- f.redis_password = cmdFiler.Flag.String("redis.password", "", "password in clear text")
- f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
-
}
var cmdFiler = &Command{
- UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>",
- Short: "start a file server that points to a master server",
+ UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
+ Short: "start a file server that points to a master server, or a list of master servers",
Long: `start a file server which accepts REST operation for any files.
//create or overwrite the file, the directories /path/to will be automatically created
@@ -75,20 +61,15 @@ var cmdFiler = &Command{
//return a json format subdirectory and files listing
GET /path/to/
- Current <fullpath~fileid> mapping metadata store is local embedded leveldb.
- It should be highly scalable to hundreds of millions of files on a modest machine.
+ The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order.
- Future we will ensure it can avoid of being SPOF.
+ The following are example filer.toml configuration file.
- `,
+` + filer2.FILER_TOML_EXAMPLE + "\n",
}
func runFiler(cmd *Command, args []string) bool {
- if err := util.TestFolderWritable(*f.dir); err != nil {
- glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err)
- }
-
f.start()
return true
@@ -103,14 +84,13 @@ func (fo *FilerOptions) start() {
publicVolumeMux = http.NewServeMux()
}
+ masters := *f.masters
+
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux,
- *fo.ip, *fo.port, *fo.master, *fo.dir, *fo.collection,
+ *fo.ip, *fo.port, strings.Split(masters, ","), *fo.collection,
*fo.defaultReplicaPlacement, *fo.redirectOnRead, *fo.disableDirListing,
- *fo.confFile,
*fo.maxMB,
*fo.secretKey,
- *fo.cassandra_server, *fo.cassandra_keyspace,
- *fo.redis_server, *fo.redis_password, *fo.redis_database,
)
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index da7fb43bb..904aac76c 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -9,8 +9,15 @@ import (
"strings"
"github.com/chrislusf/seaweedfs/weed/operation"
- filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer"
"github.com/chrislusf/seaweedfs/weed/security"
+ "path"
+ "net/http"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "strconv"
+ "io"
+ "time"
+ "google.golang.org/grpc"
+ "context"
)
var (
@@ -68,20 +75,20 @@ func runCopy(cmd *Command, args []string) bool {
return false
}
filerDestination := args[len(args)-1]
- fileOrDirs := args[0 : len(args)-1]
+ fileOrDirs := args[0: len(args)-1]
filerUrl, err := url.Parse(filerDestination)
if err != nil {
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
return false
}
- path := filerUrl.Path
- if !strings.HasSuffix(path, "/") {
- path = path + "/"
+ urlPath := filerUrl.Path
+ if !strings.HasSuffix(urlPath, "/") {
+ urlPath = urlPath + "/"
}
for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(fileOrDir, filerUrl.Host, path) {
+ if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) {
return false
}
}
@@ -91,14 +98,14 @@ func runCopy(cmd *Command, args []string) bool {
func doEachCopy(fileOrDir string, host string, path string) bool {
f, err := os.Open(fileOrDir)
if err != nil {
- fmt.Printf("Failed to open file %s: %v", fileOrDir, err)
+ fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err)
return false
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
- fmt.Printf("Failed to get stat for file %s: %v", fileOrDir, err)
+ fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err)
return false
}
@@ -120,28 +127,199 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
}
}
- parts, err := operation.NewFileParts([]string{fileOrDir})
- if err != nil {
- fmt.Printf("Failed to read file %s: %v", fileOrDir, err)
+ // find the chunk count
+ chunkSize := int64(*copy.maxMB * 1024 * 1024)
+ chunkCount := 1
+ if chunkSize > 0 && fi.Size() > chunkSize {
+ chunkCount = int(fi.Size()/chunkSize) + 1
}
- results, err := operation.SubmitFiles(*copy.master, parts,
- *copy.replication, *copy.collection, "",
- *copy.ttl, *copy.maxMB, copy.secret)
- if err != nil {
- fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)
+ if chunkCount == 1 {
+ return uploadFileAsOne(host, path, f, fi)
+ }
+
+ return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize)
+}
+
+func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool {
+
+ // upload the file content
+ fileName := filepath.Base(f.Name())
+ mimeType := detectMimeType(f)
+ isGzipped := isGzipped(fileName)
+
+ var chunks []*filer_pb.FileChunk
+
+ if fi.Size() > 0 {
+
+ // assign a volume
+ assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
+ Count: 1,
+ Replication: *copy.replication,
+ Collection: *copy.collection,
+ Ttl: *copy.ttl,
+ })
+ if err != nil {
+ fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ }
+
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+
+ uploadResult, err := operation.Upload(targetUrl, fileName, f, isGzipped, mimeType, nil, "")
+ if err != nil {
+ fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ return false
+ }
+ if uploadResult.Error != "" {
+ fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return false
+ }
+ fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
+
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: 0,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ })
+
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
+ }
+
+ if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.CreateEntryRequest{
+ Directory: urlFolder,
+ Entry: &filer_pb.Entry{
+ Name: fileName,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ Gid: uint32(os.Getgid()),
+ Uid: uint32(os.Getuid()),
+ FileSize: uint64(fi.Size()),
+ FileMode: uint32(fi.Mode()),
+ Mime: mimeType,
+ },
+ Chunks: chunks,
+ },
+ }
+
+ if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ return fmt.Errorf("update fh: %v", err)
+ }
+ return nil
+ }); err != nil {
+ fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
+ return false
}
- if strings.HasSuffix(path, "/") {
- path = path + fi.Name()
+ return true
+}
+
+func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+
+ fileName := filepath.Base(f.Name())
+ mimeType := detectMimeType(f)
+
+ var chunks []*filer_pb.FileChunk
+
+ for i := int64(0); i < int64(chunkCount); i++ {
+
+ // assign a volume
+ assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
+ Count: 1,
+ Replication: *copy.replication,
+ Collection: *copy.collection,
+ Ttl: *copy.ttl,
+ })
+ if err != nil {
+ fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ }
+
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+
+ uploadResult, err := operation.Upload(targetUrl,
+ fileName+"-"+strconv.FormatInt(i+1, 10),
+ io.LimitReader(f, chunkSize),
+ false, "application/octet-stream", nil, "")
+ if err != nil {
+ fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ return false
+ }
+ if uploadResult.Error != "" {
+ fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return false
+ }
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ })
+ fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}
- if err = filer_operation.RegisterFile(host, path, results[0].Fid, copy.secret); err != nil {
- fmt.Printf("Failed to register file %s on %s: %v", fileOrDir, host, err)
+ if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.CreateEntryRequest{
+ Directory: urlFolder,
+ Entry: &filer_pb.Entry{
+ Name: fileName,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ Gid: uint32(os.Getgid()),
+ Uid: uint32(os.Getuid()),
+ FileSize: uint64(fi.Size()),
+ FileMode: uint32(fi.Mode()),
+ Mime: mimeType,
+ },
+ Chunks: chunks,
+ },
+ }
+
+ if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ return fmt.Errorf("update fh: %v", err)
+ }
+ return nil
+ }); err != nil {
+ fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err)
return false
}
- fmt.Printf("Copy %s => http://%s%s\n", fileOrDir, host, path)
+ fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName)
return true
}
+
+func isGzipped(filename string) bool {
+ return strings.ToLower(path.Ext(filename)) == ".gz"
+}
+
+func detectMimeType(f *os.File) string {
+ head := make([]byte, 512)
+ f.Seek(0, 0)
+ n, err := f.Read(head)
+ if err == io.EOF {
+ return ""
+ }
+ if err != nil {
+ fmt.Printf("read head of %v: %v\n", f.Name(), err)
+ return "application/octet-stream"
+ }
+ f.Seek(0, 0)
+ mimeType := http.DetectContentType(head[:n])
+ return mimeType
+}
+
+func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
+ }
+ defer grpcConnection.Close()
+
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+
+ return fn(client)
+}
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 746e4b92e..6ba3b3697 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -1,8 +1,11 @@
package command
type MountOptions struct {
- filer *string
- dir *string
+ filer *string
+ dir *string
+ collection *string
+ replication *string
+ chunkSizeLimitMB *int
}
var (
@@ -11,9 +14,11 @@ var (
func init() {
cmdMount.Run = runMount // break init cycle
- cmdMount.IsDebug = cmdMount.Flag.Bool("debug", false, "verbose debug information")
mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location")
mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory")
+ mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files")
+ mountOptions.replication = cmdMount.Flag.String("replication", "000", "replication to create to files")
+ mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files")
}
var cmdMount = &Command{
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 4908bdbff..d8b6884ff 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -8,9 +8,9 @@ import (
"bazil.org/fuse"
"bazil.org/fuse/fs"
+ "github.com/chrislusf/seaweedfs/weed/filesys"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/filesys"
)
func runMount(cmd *Command, args []string) bool {
@@ -19,6 +19,10 @@ func runMount(cmd *Command, args []string) bool {
fmt.Printf("Please specify the mount directory via \"-dir\"")
return false
}
+ if *mountOptions.chunkSizeLimitMB <= 0 {
+ fmt.Printf("Please specify a reasonable buffer size.")
+ return false
+ }
fuse.Unmount(*mountOptions.dir)
@@ -47,7 +51,8 @@ func runMount(cmd *Command, args []string) bool {
c.Close()
})
- err = fs.Serve(c, filesys.NewSeaweedFileSystem(*mountOptions.filer))
+ err = fs.Serve(c, filesys.NewSeaweedFileSystem(
+ *mountOptions.filer, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB))
if err != nil {
fuse.Unmount(*mountOptions.dir)
}
diff --git a/weed/command/server.go b/weed/command/server.go
index 2425532ac..606845199 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -83,21 +83,13 @@ var (
func init() {
serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file")
- filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
- filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
- filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
- filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
- filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
- filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
- filerOptions.redis_password = cmdServer.Flag.String("filer.redis.password", "", "redis password in clear text")
- filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server")
}
func runServer(cmd *Command, args []string) bool {
@@ -115,7 +107,7 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
- *filerOptions.master = *serverIp + ":" + strconv.Itoa(*masterPort)
+ master := *serverIp + ":" + strconv.Itoa(*masterPort)
filerOptions.ip = serverIp
if *filerOptions.defaultReplicaPlacement == "" {
@@ -157,15 +149,6 @@ func runServer(cmd *Command, args []string) bool {
if *masterMetaFolder == "" {
*masterMetaFolder = folders[0]
}
- if *isStartingFiler {
- if *filerOptions.dir == "" {
- *filerOptions.dir = *masterMetaFolder + "/filer"
- os.MkdirAll(*filerOptions.dir, 0700)
- }
- if err := util.TestFolderWritable(*filerOptions.dir); err != nil {
- glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err)
- }
- }
if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err)
}
@@ -267,7 +250,7 @@ func runServer(cmd *Command, args []string) bool {
*serverIp, *volumePort, *volumeServerPublicUrl,
folders, maxCounts,
volumeNeedleMapKind,
- *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack,
+ []string{master}, *volumePulse, *serverDataCenter, *serverRack,
serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect,
)
diff --git a/weed/command/volume.go b/weed/command/volume.go
index a54ffd1fd..407c39eb1 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -26,7 +26,7 @@ type VolumeServerOptions struct {
ip *string
publicUrl *string
bindIp *string
- master *string
+ masters *string
pulseSeconds *int
idleConnectionTimeout *int
maxCpu *int
@@ -47,7 +47,7 @@ func init() {
v.ip = cmdVolume.Flag.String("ip", "", "ip or server name")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
- v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
+ v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
@@ -132,11 +132,14 @@ func runVolume(cmd *Command, args []string) bool {
case "btree":
volumeNeedleMapKind = storage.NeedleMapBtree
}
+
+ masters := *v.masters
+
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
*v.ip, *v.port, *v.publicUrl,
v.folders, v.folderMaxLimits,
volumeNeedleMapKind,
- *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack,
+ strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation, *v.readRedirect,
)