diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer.go | 48 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 220 | ||||
| -rw-r--r-- | weed/command/mount.go | 11 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 9 | ||||
| -rw-r--r-- | weed/command/server.go | 21 | ||||
| -rw-r--r-- | weed/command/volume.go | 9 |
6 files changed, 236 insertions, 82 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go index a2929d0d3..1bd1493bd 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -2,17 +2,18 @@ package command import ( "net/http" - "os" "strconv" "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" "github.com/soheilhy/cmux" - "google.golang.org/grpc/reflection" "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "google.golang.org/grpc/reflection" + "strings" ) var ( @@ -20,50 +21,35 @@ var ( ) type FilerOptions struct { - master *string + masters *string ip *string port *int publicPort *int collection *string defaultReplicaPlacement *string - dir *string redirectOnRead *bool disableDirListing *bool - confFile *string maxMB *int secretKey *string - cassandra_server *string - cassandra_keyspace *string - redis_server *string - redis_password *string - redis_database *int } func init() { cmdFiler.Run = runFiler // break init cycle - f.master = cmdFiler.Flag.String("master", "localhost:9333", "master server location") + f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public") - f.dir = cmdFiler.Flag.String("dir", os.TempDir(), "directory to store meta data") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing") - f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file") f.maxMB = cmdFiler.Flag.Int("maxMB", 32, "split files larger than the limit") - f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server") - f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server") - f.redis_server = cmdFiler.Flag.String("redis.server", "", "comma separated host:port[,host2:port2]* of the redis server, e.g., 127.0.0.1:6379") - f.redis_password = cmdFiler.Flag.String("redis.password", "", "password in clear text") - f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server") f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") - } var cmdFiler = &Command{ - UsageLine: "filer -port=8888 -dir=/tmp -master=<ip:port>", - Short: "start a file server that points to a master server", + UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*", + Short: "start a file server that points to a master server, or a list of master servers", Long: `start a file server which accepts REST operation for any files. //create or overwrite the file, the directories /path/to will be automatically created @@ -75,20 +61,15 @@ var cmdFiler = &Command{ //return a json format subdirectory and files listing GET /path/to/ - Current <fullpath~fileid> mapping metadata store is local embedded leveldb. - It should be highly scalable to hundreds of millions of files on a modest machine. + The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order. - Future we will ensure it can avoid of being SPOF. + The following are example filer.toml configuration file. - `, +` + filer2.FILER_TOML_EXAMPLE + "\n", } func runFiler(cmd *Command, args []string) bool { - if err := util.TestFolderWritable(*f.dir); err != nil { - glog.Fatalf("Check Meta Folder (-dir) Writable %s : %s", *f.dir, err) - } - f.start() return true @@ -103,14 +84,13 @@ func (fo *FilerOptions) start() { publicVolumeMux = http.NewServeMux() } + masters := *f.masters + fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, - *fo.ip, *fo.port, *fo.master, *fo.dir, *fo.collection, + *fo.ip, *fo.port, strings.Split(masters, ","), *fo.collection, *fo.defaultReplicaPlacement, *fo.redirectOnRead, *fo.disableDirListing, - *fo.confFile, *fo.maxMB, *fo.secretKey, - *fo.cassandra_server, *fo.cassandra_keyspace, - *fo.redis_server, *fo.redis_password, *fo.redis_database, ) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index da7fb43bb..904aac76c 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -9,8 +9,15 @@ import ( "strings" "github.com/chrislusf/seaweedfs/weed/operation" - filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer" "github.com/chrislusf/seaweedfs/weed/security" + "path" + "net/http" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "strconv" + "io" + "time" + "google.golang.org/grpc" + "context" ) var ( @@ -68,20 +75,20 @@ func runCopy(cmd *Command, args []string) bool { return false } filerDestination := args[len(args)-1] - fileOrDirs := args[0 : len(args)-1] + fileOrDirs := args[0: len(args)-1] filerUrl, err := url.Parse(filerDestination) if err != nil { fmt.Printf("The last argument should be a URL on filer: %v\n", err) return false } - path := filerUrl.Path - if !strings.HasSuffix(path, "/") { - path = path + "/" + urlPath := filerUrl.Path + if !strings.HasSuffix(urlPath, "/") { + urlPath = urlPath + "/" } for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, path) { + if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) { return false } } @@ -91,14 +98,14 @@ func runCopy(cmd *Command, args []string) bool { func doEachCopy(fileOrDir string, host string, path string) bool { f, err := os.Open(fileOrDir) if err != nil { - fmt.Printf("Failed to open file %s: %v", fileOrDir, err) + fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) return false } defer f.Close() fi, err := f.Stat() if err != nil { - fmt.Printf("Failed to get stat for file %s: %v", fileOrDir, err) + fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err) return false } @@ -120,28 +127,199 @@ func doEachCopy(fileOrDir string, host string, path string) bool { } } - parts, err := operation.NewFileParts([]string{fileOrDir}) - if err != nil { - fmt.Printf("Failed to read file %s: %v", fileOrDir, err) + // find the chunk count + chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkCount := 1 + if chunkSize > 0 && fi.Size() > chunkSize { + chunkCount = int(fi.Size()/chunkSize) + 1 } - results, err := operation.SubmitFiles(*copy.master, parts, - *copy.replication, *copy.collection, "", - *copy.ttl, *copy.maxMB, copy.secret) - if err != nil { - fmt.Printf("Failed to submit file %s: %v", fileOrDir, err) + if chunkCount == 1 { + return uploadFileAsOne(host, path, f, fi) + } + + return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize) +} + +func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool { + + // upload the file content + fileName := filepath.Base(f.Name()) + mimeType := detectMimeType(f) + isGzipped := isGzipped(fileName) + + var chunks []*filer_pb.FileChunk + + if fi.Size() > 0 { + + // assign a volume + assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + Count: 1, + Replication: *copy.replication, + Collection: *copy.collection, + Ttl: *copy.ttl, + }) + if err != nil { + fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + } + + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + + uploadResult, err := operation.Upload(targetUrl, fileName, f, isGzipped, mimeType, nil, "") + if err != nil { + fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) + return false + } + if uploadResult.Error != "" { + fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return false + } + fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: 0, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + }) + + fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName) + } + + if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.CreateEntryRequest{ + Directory: urlFolder, + Entry: &filer_pb.Entry{ + Name: fileName, + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: uint32(os.Getgid()), + Uid: uint32(os.Getuid()), + FileSize: uint64(fi.Size()), + FileMode: uint32(fi.Mode()), + Mime: mimeType, + }, + Chunks: chunks, + }, + } + + if _, err := client.CreateEntry(context.Background(), request); err != nil { + return fmt.Errorf("update fh: %v", err) + } + return nil + }); err != nil { + fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err) + return false } - if strings.HasSuffix(path, "/") { - path = path + fi.Name() + return true +} + +func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { + + fileName := filepath.Base(f.Name()) + mimeType := detectMimeType(f) + + var chunks []*filer_pb.FileChunk + + for i := int64(0); i < int64(chunkCount); i++ { + + // assign a volume + assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + Count: 1, + Replication: *copy.replication, + Collection: *copy.collection, + Ttl: *copy.ttl, + }) + if err != nil { + fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + } + + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + + uploadResult, err := operation.Upload(targetUrl, + fileName+"-"+strconv.FormatInt(i+1, 10), + io.LimitReader(f, chunkSize), + false, "application/octet-stream", nil, "") + if err != nil { + fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) + return false + } + if uploadResult.Error != "" { + fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return false + } + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: i * chunkSize, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + }) + fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) } - if err = filer_operation.RegisterFile(host, path, results[0].Fid, copy.secret); err != nil { - fmt.Printf("Failed to register file %s on %s: %v", fileOrDir, host, err) + if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.CreateEntryRequest{ + Directory: urlFolder, + Entry: &filer_pb.Entry{ + Name: fileName, + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: uint32(os.Getgid()), + Uid: uint32(os.Getuid()), + FileSize: uint64(fi.Size()), + FileMode: uint32(fi.Mode()), + Mime: mimeType, + }, + Chunks: chunks, + }, + } + + if _, err := client.CreateEntry(context.Background(), request); err != nil { + return fmt.Errorf("update fh: %v", err) + } + return nil + }); err != nil { + fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err) return false } - fmt.Printf("Copy %s => http://%s%s\n", fileOrDir, host, path) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName) return true } + +func isGzipped(filename string) bool { + return strings.ToLower(path.Ext(filename)) == ".gz" +} + +func detectMimeType(f *os.File) string { + head := make([]byte, 512) + f.Seek(0, 0) + n, err := f.Read(head) + if err == io.EOF { + return "" + } + if err != nil { + fmt.Printf("read head of %v: %v\n", f.Name(), err) + return "application/octet-stream" + } + f.Seek(0, 0) + mimeType := http.DetectContentType(head[:n]) + return mimeType +} + +func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error { + + grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", filerAddress, err) + } + defer grpcConnection.Close() + + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + + return fn(client) +} diff --git a/weed/command/mount.go b/weed/command/mount.go index 746e4b92e..6ba3b3697 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -1,8 +1,11 @@ package command type MountOptions struct { - filer *string - dir *string + filer *string + dir *string + collection *string + replication *string + chunkSizeLimitMB *int } var ( @@ -11,9 +14,11 @@ var ( func init() { cmdMount.Run = runMount // break init cycle - cmdMount.IsDebug = cmdMount.Flag.Bool("debug", false, "verbose debug information") mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") + mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") + mountOptions.replication = cmdMount.Flag.String("replication", "000", "replication to create to files") + mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 16, "local write buffer size, also chunk large files") } var cmdMount = &Command{ diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 4908bdbff..d8b6884ff 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -8,9 +8,9 @@ import ( "bazil.org/fuse" "bazil.org/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/filesys" ) func runMount(cmd *Command, args []string) bool { @@ -19,6 +19,10 @@ func runMount(cmd *Command, args []string) bool { fmt.Printf("Please specify the mount directory via \"-dir\"") return false } + if *mountOptions.chunkSizeLimitMB <= 0 { + fmt.Printf("Please specify a reasonable buffer size.") + return false + } fuse.Unmount(*mountOptions.dir) @@ -47,7 +51,8 @@ func runMount(cmd *Command, args []string) bool { c.Close() }) - err = fs.Serve(c, filesys.NewSeaweedFileSystem(*mountOptions.filer)) + err = fs.Serve(c, filesys.NewSeaweedFileSystem( + *mountOptions.filer, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB)) if err != nil { fuse.Unmount(*mountOptions.dir) } diff --git a/weed/command/server.go b/weed/command/server.go index 2425532ac..606845199 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -83,21 +83,13 @@ var ( func init() { serverOptions.cpuprofile = cmdServer.Flag.String("cpuprofile", "", "cpu profile output file") - filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server") filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection") filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") - filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.") filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") - filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit") - filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server") - filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server") - filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379") - filerOptions.redis_password = cmdServer.Flag.String("filer.redis.password", "", "redis password in clear text") - filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server") } func runServer(cmd *Command, args []string) bool { @@ -115,7 +107,7 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - *filerOptions.master = *serverIp + ":" + strconv.Itoa(*masterPort) + master := *serverIp + ":" + strconv.Itoa(*masterPort) filerOptions.ip = serverIp if *filerOptions.defaultReplicaPlacement == "" { @@ -157,15 +149,6 @@ func runServer(cmd *Command, args []string) bool { if *masterMetaFolder == "" { *masterMetaFolder = folders[0] } - if *isStartingFiler { - if *filerOptions.dir == "" { - *filerOptions.dir = *masterMetaFolder + "/filer" - os.MkdirAll(*filerOptions.dir, 0700) - } - if err := util.TestFolderWritable(*filerOptions.dir); err != nil { - glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err) - } - } if err := util.TestFolderWritable(*masterMetaFolder); err != nil { glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err) } @@ -267,7 +250,7 @@ func runServer(cmd *Command, args []string) bool { *serverIp, *volumePort, *volumeServerPublicUrl, folders, maxCounts, volumeNeedleMapKind, - *serverIp+":"+strconv.Itoa(*masterPort), *volumePulse, *serverDataCenter, *serverRack, + []string{master}, *volumePulse, *serverDataCenter, *serverRack, serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, ) diff --git a/weed/command/volume.go b/weed/command/volume.go index a54ffd1fd..407c39eb1 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -26,7 +26,7 @@ type VolumeServerOptions struct { ip *string publicUrl *string bindIp *string - master *string + masters *string pulseSeconds *int idleConnectionTimeout *int maxCpu *int @@ -47,7 +47,7 @@ func init() { v.ip = cmdVolume.Flag.String("ip", "", "ip or server name") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") - v.master = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location") + v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") @@ -132,11 +132,14 @@ func runVolume(cmd *Command, args []string) bool { case "btree": volumeNeedleMapKind = storage.NeedleMapBtree } + + masters := *v.masters + volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, v.folders, v.folderMaxLimits, volumeNeedleMapKind, - *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, + strings.Split(masters, ","), *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readRedirect, ) |
