aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorhenry <zhanggm@sugon.com>2021-01-04 14:50:01 +0800
committerhenry <zhanggm@sugon.com>2021-01-04 14:50:01 +0800
commit97a94eddab407219faf6b5a5f809057cde331eaa (patch)
treef0f2111f812daa43dca31218bfa62f3f53ee7bd4 /weed
parent14ddd155082a455542a992220d6c61e658420c6b (diff)
parent2ce86f308ea4836cf534e50dc1388932253b5cd5 (diff)
downloadseaweedfs-97a94eddab407219faf6b5a5f809057cde331eaa.tar.xz
seaweedfs-97a94eddab407219faf6b5a5f809057cde331eaa.zip
Merge branch 'master' of https://github.com/fuyouyshengwu/seaweedfs
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer.go1
-rw-r--r--weed/command/mount_std.go3
-rw-r--r--weed/command/scaffold.go24
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/shell.go25
-rw-r--r--weed/command/upload.go25
-rw-r--r--weed/filer/filer.go131
-rw-r--r--weed/filer/filer_buckets.go2
-rw-r--r--weed/filer/filer_delete_entry.go2
-rw-r--r--weed/filer/filer_notify.go4
-rw-r--r--weed/filer/filer_search.go8
-rw-r--r--weed/filer/leveldb/leveldb_store.go6
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go33
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go5
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go6
-rw-r--r--weed/filer/rocksdb/README.md41
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go293
-rw-r--r--weed/filer/rocksdb/rocksdb_store_kv.go51
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go117
-rw-r--r--weed/operation/submit.go3
-rw-r--r--weed/s3api/auth_credentials.go31
-rw-r--r--weed/server/filer_grpc_server.go4
-rw-r--r--weed/server/filer_grpc_server_rename.go2
-rw-r--r--weed/server/filer_server_handlers_read_dir.go2
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go5
-rw-r--r--weed/server/filer_server_rocksdb.go7
-rw-r--r--weed/storage/needle/needle_parse_upload.go4
-rw-r--r--weed/topology/data_node.go5
-rw-r--r--weed/util/constants.go2
29 files changed, 748 insertions, 95 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a3008eb29..146840e00 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -74,6 +74,7 @@ func init() {
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
+ filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
}
var cmdFiler = &Command{
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 83cb352ff..7e75b082d 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -58,6 +58,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return true
}
+ util.LoadConfiguration("security", false)
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
@@ -78,8 +79,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
dir := util.ResolvePath(*option.dir)
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
- util.LoadConfiguration("security", false)
-
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 6cfd46427..600a4a8d4 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -44,6 +44,8 @@ func runScaffold(cmd *Command, args []string) bool {
content = SECURITY_TOML_EXAMPLE
case "master":
content = MASTER_TOML_EXAMPLE
+ case "shell":
+ content = SHELL_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@@ -85,7 +87,13 @@ buckets_folder = "/buckets"
# local on disk, mostly for simple single-machine setup, fairly scalable
# faster than previous leveldb, recommended.
enabled = true
-dir = "." # directory to store level db files
+dir = "./filerldb2" # directory to store level db files
+
+[rocksdb]
+# local on disk, similar to leveldb
+# since it is using a C wrapper, you need to install rocksdb and build it by yourself
+enabled = false
+dir = "./filerrdb" # directory to store rocksdb files
[mysql] # or tidb
# CREATE TABLE IF NOT EXISTS filemeta (
@@ -460,4 +468,18 @@ copy_other = 1 # create n x 1 = n actual volumes
treat_replication_as_minimums = false
`
+ SHELL_TOML_EXAMPLE = `
+
+[cluster]
+default = "c1"
+
+[cluster.c1]
+master = "localhost:9333" # comma-separated master servers
+filer = "localhost:8888" # filer host and port
+
+[cluster.c2]
+master = ""
+filer = ""
+
+`
)
diff --git a/weed/command/server.go b/weed/command/server.go
index 7e63f8e8a..bd25f94b1 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -113,6 +113,7 @@ func init() {
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+ s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 6dd768f47..c9976e809 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -11,12 +11,14 @@ import (
var (
shellOptions shell.ShellOptions
shellInitialFiler *string
+ shellCluster *string
)
func init() {
cmdShell.Run = runShell // break init cycle
- shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
- shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
+ shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333")
+ shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888")
+ shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml")
}
var cmdShell = &Command{
@@ -24,6 +26,8 @@ var cmdShell = &Command{
Short: "run interactive administrative commands",
Long: `run interactive administrative commands.
+ Generate shell.toml via "weed scaffold -config=shell"
+
`,
}
@@ -32,6 +36,23 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ if *shellOptions.Masters == "" && *shellInitialFiler == "" {
+ util.LoadConfiguration("shell", false)
+ v := util.GetViper()
+ cluster := v.GetString("cluster.default")
+ if *shellCluster != "" {
+ cluster = *shellCluster
+ }
+ if cluster == "" {
+ *shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888"
+ } else {
+ *shellOptions.Masters = v.GetString("cluster." + cluster + ".master")
+ *shellInitialFiler = v.GetString("cluster." + cluster + ".filer")
+ }
+ }
+
+ fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler)
+
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
if err != nil {
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 45b15535b..7115da587 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -1,8 +1,12 @@
package command
import (
+ "context"
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "google.golang.org/grpc"
"os"
"path/filepath"
@@ -65,6 +69,15 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master)
+ if err != nil {
+ fmt.Printf("upload: %v", err)
+ return false
+ }
+ if *upload.replication == "" {
+ *upload.replication = defaultCollection
+ }
+
if len(args) == 0 {
if *upload.dir == "" {
return false
@@ -104,3 +117,15 @@ func runUpload(cmd *Command, args []string) bool {
}
return true
}
+
+func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) {
+ err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
+ }
+ replication = resp.DefaultReplication
+ return nil
+ })
+ return
+}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 13dedea1e..920d79da5 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
- dirParts := strings.Split(string(entry.FullPath), "/")
-
- // fmt.Printf("directory parts: %+v\n", dirParts)
-
- var lastDirectoryEntry *Entry
-
- for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + util.Join(dirParts[:i]...)
- // fmt.Printf("%d directory: %+v\n", i, dirPath)
-
- // check the store directly
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
-
- // no such existing directory
- if dirEntry == nil {
-
- // create the directory
- now := time.Now()
-
- dirEntry = &Entry{
- FullPath: util.FullPath(dirPath),
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0110,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
- },
- }
-
- glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
- if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
- glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
- return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
- }
- } else {
- f.maybeAddBucket(dirEntry)
- f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
- }
-
- } else if !dirEntry.IsDirectory() {
- glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
- return fmt.Errorf("%s is a file", dirPath)
- }
-
- // remember the direct parent directory entry
- if i == len(dirParts)-1 {
- lastDirectoryEntry = dirEntry
- }
-
- }
-
- if lastDirectoryEntry == nil {
- glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
- return fmt.Errorf("parent folder not found: %v", entry.FullPath)
- }
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
@@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
*/
- oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
-
if oldEntry == nil {
+
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
@@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
+func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
+
+ if level == 0 {
+ return nil
+ }
+
+ dirPath := "/" + util.Join(dirParts[:level]...)
+ // fmt.Printf("%d directory: %+v\n", i, dirPath)
+
+ // check the store directly
+ glog.V(4).Infof("find uncached directory: %s", dirPath)
+ dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
+
+ // no such existing directory
+ if dirEntry == nil {
+
+ // ensure parent directory
+ if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
+ // create the directory
+ now := time.Now()
+
+ dirEntry = &Entry{
+ FullPath: util.FullPath(dirPath),
+ Attr: Attr{
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
+ },
+ }
+
+ glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
+ mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
+ if mkdirErr != nil {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
+ return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
+ }
+ } else {
+ f.maybeAddBucket(dirEntry)
+ f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
+ }
+
+ } else if !dirEntry.IsDirectory() {
+ glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
+ return fmt.Errorf("%s is a file", dirPath)
+ }
+
+ return nil
+}
+
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
index 4d4f4abc3..87ad9e452 100644
--- a/weed/filer/filer_buckets.go
+++ b/weed/filer/filer_buckets.go
@@ -29,7 +29,7 @@ func (f *Filer) LoadBuckets() {
limit := math.MaxInt32
- entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")
+ entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "")
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index da92c4f4b..0d1fd7e47 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -68,7 +68,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName := ""
includeLastFile := false
for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
+ entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
if err != nil {
glog.Errorf("list folder %s: %v", entry.FullPath, err)
return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 40755e6a7..563e0eb51 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -113,13 +113,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "")
+ dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "")
if listDayErr != nil {
return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "")
+ hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 2fe83b49d..7489d8e34 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -19,12 +19,16 @@ func splitPattern(pattern string) (prefix string, restPattern string) {
return "", restPattern
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) {
+// For now, prefix and namePattern are mutually exclusive
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string, namePattern string) (entries []*Entry, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- prefix, restNamePattern := splitPattern(namePattern)
+ prefixInNamePattern, restNamePattern := splitPattern(namePattern)
+ if prefixInNamePattern != "" {
+ prefix = prefixInNamePattern
+ }
var missedCount int
var lastFileName string
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index b879f3a6e..1aefbf5d4 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -170,8 +170,12 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we
func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
+ }
- iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
key := iter.Key()
if !bytes.HasPrefix(key, directoryPrefix) {
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index c5bfb8474..edf629814 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -2,9 +2,11 @@ package leveldb
import (
"context"
+ "fmt"
"io/ioutil"
"os"
"testing"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -49,14 +51,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
@@ -86,3 +88,28 @@ func TestEmptyRoot(t *testing.T) {
}
}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &LevelDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 4b41554b9..ceda64153 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -179,7 +179,10 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w
func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, prefix, store.dbCount)
- lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart, _ = genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
+ }
iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index 22c0d6052..a7f5ae95b 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/rocksdb/README.md b/weed/filer/rocksdb/README.md
new file mode 100644
index 000000000..6bae6d34e
--- /dev/null
+++ b/weed/filer/rocksdb/README.md
@@ -0,0 +1,41 @@
+# Prepare the compilation environment on linux
+- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
+- sudo apt-get update -qq
+- sudo apt-get install gcc-6 g++-6 libsnappy-dev zlib1g-dev libbz2-dev -qq
+- export CXX="g++-6" CC="gcc-6"
+
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags2_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags2_2.0-1.1ubuntu1_amd64.deb
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+
+# Prepare the compilation environment on mac os
+```
+brew install snappy
+```
+
+# install rocksdb:
+```
+ export ROCKSDB_HOME=/Users/chris/dev/rocksdb
+
+ git clone https://github.com/facebook/rocksdb.git $ROCKSDB_HOME
+ pushd $ROCKSDB_HOME
+ make clean
+ make install-static
+ popd
+```
+
+# install gorocksdb
+
+```
+export CGO_CFLAGS="-I$ROCKSDB_HOME/include"
+export CGO_LDFLAGS="-L$ROCKSDB_HOME -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
+
+go get github.com/tecbot/gorocksdb
+```
+# compile with rocksdb
+
+```
+cd ~/go/src/github.com/chrislusf/seaweedfs/weed
+go install -tags rocksdb
+```
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
new file mode 100644
index 000000000..a8992cf03
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -0,0 +1,293 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tecbot/gorocksdb"
+ "io"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RocksDBStore{})
+}
+
+type RocksDBStore struct {
+ path string
+ db *gorocksdb.DB
+}
+
+func (store *RocksDBStore) GetName() string {
+ return "rocksdb"
+}
+
+func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *RocksDBStore) initialize(dir string) (err error) {
+ glog.Infof("filer store rocksdb dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+
+ options := gorocksdb.NewDefaultOptions()
+ options.SetCreateIfMissing(true)
+ store.db, err = gorocksdb.OpenDb(options, dir)
+
+ return
+}
+
+func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *RocksDBStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ wo := gorocksdb.NewDefaultWriteOptions()
+ err = store.db.Put(wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ data, err := store.db.GetBytes(ro, key)
+
+ if data == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ wo := gorocksdb.NewDefaultWriteOptions()
+ err = store.db.Delete(wo, key)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ batch := new(gorocksdb.WriteBatch)
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ ro.SetFillCache(false)
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
+ batch.Delete(key)
+ return true
+ })
+ if err != nil {
+ return fmt.Errorf("delete list %s : %v", fullpath, err)
+ }
+
+ wo := gorocksdb.NewDefaultWriteOptions()
+ err = store.db.Write(wo, batch)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int, fn func(key, value []byte) bool) error {
+
+ if len(lastKey) == 0 {
+ iter.Seek(prefix)
+ } else {
+ iter.Seek(lastKey)
+
+ if !includeLastKey {
+ k := iter.Key()
+ v := iter.Value()
+ key := k.Data()
+ defer k.Free()
+ defer v.Free()
+
+ if !bytes.HasPrefix(key, prefix) {
+ return nil
+ }
+
+ if bytes.Equal(key, lastKey) {
+ iter.Next()
+ }
+
+ }
+ }
+
+ i := 0
+ for ; iter.Valid(); iter.Next() {
+
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+
+ k := iter.Key()
+ v := iter.Value()
+ key := k.Data()
+ value := v.Data()
+
+ if !bytes.HasPrefix(key, prefix) {
+ k.Free()
+ v.Free()
+ break
+ }
+
+ ret := fn(key, value)
+
+ k.Free()
+ v.Free()
+
+ if !ret {
+ break
+ }
+
+ }
+
+ if err := iter.Err(); err != nil {
+ return fmt.Errorf("prefix scan iterator: %v", err)
+ }
+ return nil
+}
+
+func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
+ limit int) (entries []*filer.Entry, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+}
+
+func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(fullpath, startFileName)
+ }
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ ro.SetFillCache(false)
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, lastFileStart, inclusive, limit, func(key, value []byte) bool {
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ return true
+ }
+ limit--
+ if limit < 0 {
+ return false
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ }
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(value)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return false
+ }
+ entries = append(entries, entry)
+ return true
+ })
+ if err != nil {
+ return entries, fmt.Errorf("prefix list %s : %v", fullpath, err)
+ }
+
+ return entries, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory, and use last byte for partitioning
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ return b
+}
+
+func (store *RocksDBStore) Shutdown() {
+ store.db.Close()
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_kv.go b/weed/filer/rocksdb/rocksdb_store_kv.go
new file mode 100644
index 000000000..093a905e8
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_kv.go
@@ -0,0 +1,51 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ wo := gorocksdb.NewDefaultWriteOptions()
+ err = store.db.Put(wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ value, err = store.db.GetBytes(ro, key)
+
+ if value == nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get: %v", err)
+ }
+
+ return
+}
+
+func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ wo := gorocksdb.NewDefaultWriteOptions()
+ err = store.db.Delete(wo, key)
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
new file mode 100644
index 000000000..5c48e9bd9
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -0,0 +1,117 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := testFiler.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 25843c892..e785b68a9 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -32,7 +32,7 @@ type FilePart struct {
type SubmitResult struct {
FileName string `json:"fileName,omitempty"`
- FileUrl string `json:"fileUrl,omitempty"`
+ FileUrl string `json:"url,omitempty"`
Fid string `json:"fid,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
@@ -69,6 +69,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
+ file.Ttl = ttl
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index c305fee6f..b8af6381a 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -156,7 +156,36 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
// check whether the request has valid access keys
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) (*Identity, s3err.ErrorCode) {
- identity, s3Err := iam.authUser(r)
+ var identity *Identity
+ var s3Err s3err.ErrorCode
+ var found bool
+ switch getRequestAuthType(r) {
+ case authTypeStreamingSigned:
+ return identity, s3err.ErrNone
+ case authTypeUnknown:
+ glog.V(3).Infof("unknown auth type")
+ return identity, s3err.ErrAccessDenied
+ case authTypePresignedV2, authTypeSignedV2:
+ glog.V(3).Infof("v2 auth type")
+ identity, s3Err = iam.isReqAuthenticatedV2(r)
+ case authTypeSigned, authTypePresigned:
+ glog.V(3).Infof("v4 auth type")
+ identity, s3Err = iam.reqSignatureV4Verify(r)
+ case authTypePostPolicy:
+ glog.V(3).Infof("post policy auth type")
+ return identity, s3err.ErrNone
+ case authTypeJWT:
+ glog.V(3).Infof("jwt auth type")
+ return identity, s3err.ErrNotImplemented
+ case authTypeAnonymous:
+ identity, found = iam.lookupAnonymous()
+ if !found {
+ return identity, s3err.ErrAccessDenied
+ }
+ default:
+ return identity, s3err.ErrNotImplemented
+ }
+
if s3Err != s3err.ErrNone {
return identity, s3Err
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5f1b2d819..efc06f0d6 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -61,7 +61,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix, "")
if err != nil {
return err
@@ -326,7 +326,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
- if err != nil {
+ if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
}
return resp, nil
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index fa86737ac..a0c9aedb9 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -75,7 +75,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "")
+ entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "")
if err != nil {
return err
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index f303ba1d4..ba01ce4f7 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -36,7 +36,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
namePattern := r.FormValue("namePattern")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern)
+ entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "", namePattern)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index eee39152b..fe4e68140 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -111,7 +111,10 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := ""
- contentType := ""
+ contentType := r.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
if err != nil {
diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go
new file mode 100644
index 000000000..5fcc7e88f
--- /dev/null
+++ b/weed/server/filer_server_rocksdb.go
@@ -0,0 +1,7 @@
+// +build rocksdb
+
+package weed_server
+
+import (
+ _ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb"
+)
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
index 8f457be1d..7201503f1 100644
--- a/weed/storage/needle/needle_parse_upload.go
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -193,9 +193,9 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
mtype = contentType
}
- pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
- // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd"
}
+ pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
+ // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd"
return
}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 0a4df63d0..eaed51654 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -151,7 +151,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro
}
func (dn *DataNode) GetDataCenter() *DataCenter {
- return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
+ rack := dn.Parent()
+ dcNode := rack.Parent()
+ dcValue := dcNode.GetValue()
+ return dcValue.(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 95370746b..cfb33516b 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 17)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 19)
COMMIT = ""
)