aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/command.go3
-rw-r--r--weed/command/filer.go7
-rw-r--r--weed/command/filer_cat.go118
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/filer_meta_tail.go201
-rw-r--r--weed/command/filer_replication.go3
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go5
-rw-r--r--weed/command/s3.go17
-rw-r--r--weed/command/scaffold.go84
-rw-r--r--weed/command/server.go10
-rw-r--r--weed/command/shell.go25
-rw-r--r--weed/command/upload.go25
-rw-r--r--weed/command/watch.go113
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go194
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store_kv.go23
-rw-r--r--weed/filer/cassandra/cassandra_store.go22
-rw-r--r--weed/filer/configuration.go6
-rw-r--r--weed/filer/elastic/v7/elastic_store.go60
-rw-r--r--weed/filer/etcd/etcd_store.go23
-rw-r--r--weed/filer/filechunk_manifest.go7
-rw-r--r--weed/filer/filechunks.go9
-rw-r--r--weed/filer/filer.go147
-rw-r--r--weed/filer/filer_buckets.go4
-rw-r--r--weed/filer/filer_delete_entry.go69
-rw-r--r--weed/filer/filer_notify.go6
-rw-r--r--weed/filer/filer_search.go70
-rw-r--r--weed/filer/filerstore.go6
-rw-r--r--weed/filer/filerstore_translate_path.go24
-rw-r--r--weed/filer/filerstore_wrapper.go62
-rw-r--r--weed/filer/hbase/hbase_store.go19
-rw-r--r--weed/filer/hbase/hbase_store_kv.go3
-rw-r--r--weed/filer/leveldb/leveldb_store.go28
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go33
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go27
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go6
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go375
-rw-r--r--weed/filer/leveldb3/leveldb3_store_kv.go46
-rw-r--r--weed/filer/leveldb3/leveldb3_store_test.go88
-rw-r--r--weed/filer/mongodb/mongodb_store.go22
-rw-r--r--weed/filer/mysql/mysql_sql_gen.go52
-rw-r--r--weed/filer/mysql/mysql_store.go23
-rw-r--r--weed/filer/mysql2/mysql2_store.go82
-rw-r--r--weed/filer/postgres/postgres_sql_gen.go53
-rw-r--r--weed/filer/postgres/postgres_store.go18
-rw-r--r--weed/filer/postgres2/postgres2_store.go87
-rw-r--r--weed/filer/reader_at.go8
-rw-r--r--weed/filer/redis/redis_cluster_store.go6
-rw-r--r--weed/filer/redis/redis_store.go2
-rw-r--r--weed/filer/redis/universal_redis_store.go49
-rw-r--r--weed/filer/redis/universal_redis_store_kv.go8
-rw-r--r--weed/filer/redis2/redis_cluster_store.go6
-rw-r--r--weed/filer/redis2/redis_store.go2
-rw-r--r--weed/filer/redis2/universal_redis_store.go51
-rw-r--r--weed/filer/redis2/universal_redis_store_kv.go8
-rw-r--r--weed/filer/rocksdb/README.md41
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go302
-rw-r--r--weed/filer/rocksdb/rocksdb_store_kv.go47
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go117
-rw-r--r--weed/filer/rocksdb/rocksdb_ttl.go40
-rw-r--r--weed/filer/stream.go8
-rw-r--r--weed/filesys/dir.go110
-rw-r--r--weed/filesys/filehandle.go2
-rw-r--r--weed/filesys/meta_cache/meta_cache.go16
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go7
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go26
-rw-r--r--weed/notification/configuration.go5
-rw-r--r--weed/operation/submit.go3
-rw-r--r--weed/replication/sink/filersink/filer_sink.go3
-rw-r--r--weed/s3api/auth_credentials.go31
-rw-r--r--weed/s3api/s3api_bucket_handlers.go2
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go10
-rw-r--r--weed/s3api/s3api_server.go1
-rw-r--r--weed/security/tls.go7
-rw-r--r--weed/server/common.go9
-rw-r--r--weed/server/filer_grpc_server.go44
-rw-r--r--weed/server/filer_grpc_server_rename.go4
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go52
-rw-r--r--weed/server/filer_server.go5
-rw-r--r--weed/server/filer_server_handlers_read.go17
-rw-r--r--weed/server/filer_server_handlers_read_dir.go3
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go109
-rw-r--r--weed/server/filer_server_rocksdb.go7
-rw-r--r--weed/server/volume_server_handlers_read.go5
-rw-r--r--weed/shell/command_s3_configure.go2
-rw-r--r--weed/storage/backend/backend.go4
-rw-r--r--weed/storage/needle/needle_parse_upload.go4
-rw-r--r--weed/topology/data_node.go5
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go2
-rw-r--r--weed/util/config.go65
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/log_buffer/log_buffer.go24
-rw-r--r--weed/util/log_buffer/log_read.go8
-rw-r--r--weed/wdclient/vid_map.go10
94 files changed, 2848 insertions, 760 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 0df22b575..3fa52c922 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -15,6 +15,8 @@ var Commands = []*Command{
cmdDownload,
cmdExport,
cmdFiler,
+ cmdFilerCat,
+ cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
@@ -25,7 +27,6 @@ var Commands = []*Command{
cmdScaffold,
cmdServer,
cmdShell,
- cmdWatch,
cmdUpload,
cmdVersion,
cmdVolume,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index a3008eb29..633c25cac 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -42,7 +42,7 @@ type FilerOptions struct {
cipher *bool
peers *string
metricsHttpPort *int
- cacheToFilerLimit *int
+ saveToFilerLimit *int
defaultLevelDbDirectory *string
}
@@ -64,7 +64,7 @@ func init() {
f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
- f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer
@@ -74,6 +74,7 @@ func init() {
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
+ filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
}
var cmdFiler = &Command{
@@ -148,7 +149,7 @@ func (fo *FilerOptions) startFiler() {
Host: *fo.ip,
Port: uint32(*fo.port),
Cipher: *fo.cipher,
- CacheToFilerLimit: int64(*fo.cacheToFilerLimit),
+ SaveToFilerLimit: *fo.saveToFilerLimit,
Filers: peers,
})
if nfs_err != nil {
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
new file mode 100644
index 000000000..a46098b04
--- /dev/null
+++ b/weed/command/filer_cat.go
@@ -0,0 +1,118 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "google.golang.org/grpc"
+ "math"
+ "net/url"
+ "os"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ filerCat FilerCatOptions
+)
+
+type FilerCatOptions struct {
+ grpcDialOption grpc.DialOption
+ filerAddress string
+ filerClient filer_pb.SeaweedFilerClient
+ output *string
+}
+
+func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
+ return func(fileId string) (targetUrls []string, err error) {
+ vid := filer.VolumeId(fileId)
+ resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return nil, err
+ }
+ locations := resp.LocationsMap[vid]
+ for _, loc := range locations.Locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
+ }
+ return
+ }
+}
+
+func init() {
+ cmdFilerCat.Run = runFilerCat // break init cycle
+ filerCat.output = cmdFilerCat.Flag.String("o", "", "write to file instead of stdout")
+}
+
+var cmdFilerCat = &Command{
+ UsageLine: "filer.cat [-o <file>] http://localhost:8888/path/to/file",
+ Short: "copy one file to local",
+ Long: `read one file to stdout or write to a file
+
+`,
+}
+
+func runFilerCat(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ if len(args) == 0 {
+ return false
+ }
+ filerSource := args[len(args)-1]
+
+ filerUrl, err := url.Parse(filerSource)
+ if err != nil {
+ fmt.Printf("The last argument should be a URL on filer: %v\n", err)
+ return false
+ }
+ urlPath := filerUrl.Path
+ if strings.HasSuffix(urlPath, "/") {
+ fmt.Printf("The last argument should be a file: %v\n", err)
+ return false
+ }
+
+ filerCat.filerAddress = filerUrl.Host
+ filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ dir, name := util.FullPath(urlPath).DirAndName()
+
+ writer := os.Stdout
+ if *filerCat.output != "" {
+
+ fmt.Printf("saving %s to %s\n", filerSource, *filerCat.output)
+
+ f, err := os.OpenFile(*filerCat.output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
+ if err != nil {
+ fmt.Printf("open file %s: %v\n", *filerCat.output, err)
+ return false
+ }
+ defer f.Close()
+ writer = f
+ }
+
+ pb.WithFilerClient(filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ return err
+ }
+
+ filerCat.filerClient = client
+
+ return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+
+ })
+
+ return true
+}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 93248f357..b95df696c 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -92,7 +92,7 @@ func runCopy(cmd *Command, args []string) bool {
}
urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
- fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
+ fmt.Printf("The last argument should be a folder and end with \"/\"\n")
return false
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
new file mode 100644
index 000000000..fa0262160
--- /dev/null
+++ b/weed/command/filer_meta_tail.go
@@ -0,0 +1,201 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ jsoniter "github.com/json-iterator/go"
+ "github.com/olivere/elastic/v7"
+ "io"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
+}
+
+var cmdFilerMetaTail = &Command{
+ UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
+ Short: "see recent changes on a filer",
+ Long: `See recent changes on a filer.
+
+ `,
+}
+
+var (
+ tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
+ tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
+ esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
+ esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
+)
+
+func runFilerMetaTail(cmd *Command, args []string) bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ var filterFunc func(dir, fname string) bool
+ if *tailPattern != "" {
+ if strings.Contains(*tailPattern, "/") {
+ println("watch path pattern", *tailPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ } else {
+ println("watch file pattern", *tailPattern)
+ filterFunc = func(dir, fname string) bool {
+ matched, err := filepath.Match(*tailPattern, fname)
+ if err != nil {
+ fmt.Printf("error: %v", err)
+ }
+ return matched
+ }
+ }
+ }
+
+ shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
+ if filterFunc == nil {
+ return true
+ }
+ if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
+ return false
+ }
+ if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
+ return true
+ }
+ if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
+ return true
+ }
+ return false
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
+ return nil
+ }
+ if *esServers != "" {
+ var err error
+ eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
+ if err != nil {
+ fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
+ return false
+ }
+ }
+
+ tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "tail",
+ PathPrefix: *tailTarget,
+ SinceNs: time.Now().Add(-*tailStart).UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ if !shouldPrint(resp) {
+ continue
+ }
+ if err = eachEntryFunc(resp); err != nil {
+ return err
+ }
+ }
+
+ })
+ if tailErr != nil {
+ fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
+ }
+
+ return true
+}
+
+type EsDocument struct {
+ Dir string `json:"dir,omitempty"`
+ Name string `json:"name,omitempty"`
+ IsDirectory bool `json:"isDir,omitempty"`
+ Size uint64 `json:"size,omitempty"`
+ Uid uint32 `json:"uid,omitempty"`
+ Gid uint32 `json:"gid,omitempty"`
+ UserName string `json:"userName,omitempty"`
+ Collection string `json:"collection,omitempty"`
+ Crtime int64 `json:"crtime,omitempty"`
+ Mtime int64 `json:"mtime,omitempty"`
+ Mime string `json:"mime,omitempty"`
+}
+
+func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
+ entry := event.NewEntry
+ dir, name := event.NewParentPath, entry.Name
+ id := util.Md5String([]byte(util.NewFullPath(dir, name)))
+ esEntry := &EsDocument{
+ Dir: dir,
+ Name: name,
+ IsDirectory: entry.IsDirectory,
+ Size: entry.Attributes.FileSize,
+ Uid: entry.Attributes.Uid,
+ Gid: entry.Attributes.Gid,
+ UserName: entry.Attributes.UserName,
+ Collection: entry.Attributes.Collection,
+ Crtime: entry.Attributes.Crtime,
+ Mtime: entry.Attributes.Mtime,
+ Mime: entry.Attributes.Mime,
+ }
+ return esEntry, id
+}
+
+func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
+ options := []elastic.ClientOptionFunc{}
+ options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
+ options = append(options, elastic.SetSniff(false))
+ options = append(options, elastic.SetHealthcheck(false))
+ client, err := elastic.NewClient(options...)
+ if err != nil {
+ return nil, err
+ }
+ return func(resp *filer_pb.SubscribeMetadataResponse) error {
+ event := resp.EventNotification
+ if event.OldEntry != nil &&
+ (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
+ // delete or not update the same file
+ dir, name := resp.Directory, event.OldEntry.Name
+ id := util.Md5String([]byte(util.NewFullPath(dir, name)))
+ println("delete", id)
+ _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
+ return err
+ }
+ if event.NewEntry != nil {
+ // add a new file or update the same file
+ esEntry, id := toEsEntry(event)
+ value, err := jsoniter.Marshal(esEntry)
+ if err != nil {
+ return err
+ }
+ println(string(value))
+ _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
+ return err
+ }
+ return nil
+ }, nil
+}
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index 40f2b570b..4f698e375 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -14,7 +14,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink"
"github.com/chrislusf/seaweedfs/weed/replication/sub"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
func init() {
@@ -123,7 +122,7 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
-func validateOneEnabledInput(config *viper.Viper) {
+func validateOneEnabledInput(config *util.ViperProxy) {
enabledInput := ""
for _, input := range sub.NotificationInputs {
if config.GetBool("notification." + input.GetName() + ".enabled") {
diff --git a/weed/command/mount.go b/weed/command/mount.go
index f325cb0a5..fa75919aa 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -43,7 +43,7 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
- mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0")
+ mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 128, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 83cb352ff..9e955e344 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -58,6 +58,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
return true
}
+ util.LoadConfiguration("security", false)
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
@@ -78,8 +79,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
dir := util.ResolvePath(*option.dir)
chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
- util.LoadConfiguration("security", false)
-
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if dir == "" {
fmt.Printf("Please specify the mount directory via \"-dir\"")
@@ -151,6 +150,8 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
fuse.MaxReadahead(1024 * 128),
fuse.AsyncRead(),
fuse.WritebackCache(),
+ fuse.MaxBackground(128),
+ fuse.CongestionThreshold(128),
}
options = append(options, osSpecificMountOptions()...)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index ed5bb0b80..d8e3e306b 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -23,13 +23,14 @@ var (
)
type S3Options struct {
- filer *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
- metricsHttpPort *int
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
+ allowEmptyFolder *bool
}
func init() {
@@ -41,6 +42,7 @@ func init() {
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+ s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders")
}
var cmdS3 = &Command{
@@ -181,6 +183,7 @@ func (s3opt *S3Options) startS3Server() bool {
DomainName: *s3opt.domainName,
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
+ AllowEmptyFolder: *s3opt.allowEmptyFolder,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 6cfd46427..8b74274e5 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -44,6 +44,8 @@ func runScaffold(cmd *Command, args []string) bool {
content = SECURITY_TOML_EXAMPLE
case "master":
content = MASTER_TOML_EXAMPLE
+ case "shell":
+ content = SHELL_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@@ -85,9 +87,21 @@ buckets_folder = "/buckets"
# local on disk, mostly for simple single-machine setup, fairly scalable
# faster than previous leveldb, recommended.
enabled = true
-dir = "." # directory to store level db files
+dir = "./filerldb2" # directory to store level db files
-[mysql] # or tidb
+[leveldb3]
+# similar to leveldb2.
+# each bucket has its own meta store.
+enabled = false
+dir = "./filerldb3" # directory to store level db files
+
+[rocksdb]
+# local on disk, similar to leveldb
+# since it is using a C wrapper, you need to install rocksdb and build it by yourself
+enabled = false
+dir = "./filerrdb" # directory to store rocksdb files
+
+[mysql] # or memsql, tidb
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
# name VARCHAR(1000) COMMENT 'directory or file name',
@@ -104,9 +118,31 @@ password = ""
database = "" # create or use an existing database
connection_max_idle = 2
connection_max_open = 100
+connection_max_lifetime_seconds = 0
+interpolateParams = false
+
+[mysql2] # or memsql, tidb
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS %s (
+ dirhash BIGINT,
+ name VARCHAR(1000),
+ directory TEXT,
+ meta LONGBLOB,
+ PRIMARY KEY (dirhash, name)
+ ) DEFAULT CHARSET=utf8;
+"""
+hostname = "localhost"
+port = 3306
+username = "root"
+password = ""
+database = "" # create or use an existing database
+connection_max_idle = 2
+connection_max_open = 100
+connection_max_lifetime_seconds = 0
interpolateParams = false
-[postgres] # or cockroachdb
+[postgres] # or cockroachdb, YugabyteDB
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT,
# name VARCHAR(65535),
@@ -119,7 +155,29 @@ hostname = "localhost"
port = 5432
username = "postgres"
password = ""
-database = "" # create or use an existing database
+database = "postgres" # create or use an existing database
+schema = ""
+sslmode = "disable"
+connection_max_idle = 100
+connection_max_open = 100
+
+[postgres2]
+enabled = false
+createTable = """
+ CREATE TABLE IF NOT EXISTS %s (
+ dirhash BIGINT,
+ name VARCHAR(65535),
+ directory VARCHAR(65535),
+ meta bytea,
+ PRIMARY KEY (dirhash, name)
+ );
+"""
+hostname = "localhost"
+port = 5432
+username = "postgres"
+password = ""
+database = "postgres" # create or use an existing database
+schema = ""
sslmode = "disable"
connection_max_idle = 100
connection_max_open = 100
@@ -166,9 +224,9 @@ addresses = [
]
password = ""
# allows reads from slave servers or the master, but all writes still go to the master
-readOnly = true
+readOnly = false
# automatically use the closest Redis server for reads
-routeByLatency = true
+routeByLatency = false
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
@@ -460,4 +518,18 @@ copy_other = 1 # create n x 1 = n actual volumes
treat_replication_as_minimums = false
`
+ SHELL_TOML_EXAMPLE = `
+
+[cluster]
+default = "c1"
+
+[cluster.c1]
+master = "localhost:9333" # comma-separated master servers
+filer = "localhost:8888" # filer host and port
+
+[cluster.c2]
+master = ""
+filer = ""
+
+`
)
diff --git a/weed/command/server.go b/weed/command/server.go
index 7e63f8e8a..9976db2ea 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -61,6 +61,7 @@ var (
serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ isStartingMasterServer = cmdServer.Flag.Bool("master", true, "whether to start master server")
isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -94,7 +95,7 @@ func init() {
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
- filerOptions.cacheToFilerLimit = cmdServer.Flag.Int("filer.cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -113,6 +114,7 @@ func init() {
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+ s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
@@ -223,7 +225,11 @@ func runServer(cmd *Command, args []string) bool {
}
- startMaster(masterOptions, serverWhiteList)
+ if *isStartingMasterServer {
+ go startMaster(masterOptions, serverWhiteList)
+ }
+
+ select {}
return true
}
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 6dd768f47..c9976e809 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -11,12 +11,14 @@ import (
var (
shellOptions shell.ShellOptions
shellInitialFiler *string
+ shellCluster *string
)
func init() {
cmdShell.Run = runShell // break init cycle
- shellOptions.Masters = cmdShell.Flag.String("master", "localhost:9333", "comma-separated master servers")
- shellInitialFiler = cmdShell.Flag.String("filer", "localhost:8888", "filer host and port")
+ shellOptions.Masters = cmdShell.Flag.String("master", "", "comma-separated master servers, e.g. localhost:9333")
+ shellInitialFiler = cmdShell.Flag.String("filer", "", "filer host and port, e.g. localhost:8888")
+ shellCluster = cmdShell.Flag.String("cluster", "", "cluster defined in shell.toml")
}
var cmdShell = &Command{
@@ -24,6 +26,8 @@ var cmdShell = &Command{
Short: "run interactive administrative commands",
Long: `run interactive administrative commands.
+ Generate shell.toml via "weed scaffold -config=shell"
+
`,
}
@@ -32,6 +36,23 @@ func runShell(command *Command, args []string) bool {
util.LoadConfiguration("security", false)
shellOptions.GrpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ if *shellOptions.Masters == "" && *shellInitialFiler == "" {
+ util.LoadConfiguration("shell", false)
+ v := util.GetViper()
+ cluster := v.GetString("cluster.default")
+ if *shellCluster != "" {
+ cluster = *shellCluster
+ }
+ if cluster == "" {
+ *shellOptions.Masters, *shellInitialFiler = "localhost:9333", "localhost:8888"
+ } else {
+ *shellOptions.Masters = v.GetString("cluster." + cluster + ".master")
+ *shellInitialFiler = v.GetString("cluster." + cluster + ".filer")
+ }
+ }
+
+ fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler)
+
var err error
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
if err != nil {
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 45b15535b..7115da587 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -1,8 +1,12 @@
package command
import (
+ "context"
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "google.golang.org/grpc"
"os"
"path/filepath"
@@ -65,6 +69,15 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ defaultCollection, err := readMasterConfiguration(grpcDialOption, *upload.master)
+ if err != nil {
+ fmt.Printf("upload: %v", err)
+ return false
+ }
+ if *upload.replication == "" {
+ *upload.replication = defaultCollection
+ }
+
if len(args) == 0 {
if *upload.dir == "" {
return false
@@ -104,3 +117,15 @@ func runUpload(cmd *Command, args []string) bool {
}
return true
}
+
+func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) {
+ err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
+ }
+ replication = resp.DefaultReplication
+ return nil
+ })
+ return
+}
diff --git a/weed/command/watch.go b/weed/command/watch.go
deleted file mode 100644
index fd7dd6fb2..000000000
--- a/weed/command/watch.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package command
-
-import (
- "context"
- "fmt"
- "io"
- "path/filepath"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func init() {
- cmdWatch.Run = runWatch // break init cycle
-}
-
-var cmdWatch = &Command{
- UsageLine: "watch [-filer=localhost:8888] [-target=/]",
- Short: "see recent changes on a filer",
- Long: `See recent changes on a filer.
-
- `,
-}
-
-var (
- watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port")
- watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
- watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
- watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
-)
-
-func runWatch(cmd *Command, args []string) bool {
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
- var filterFunc func(dir, fname string) bool
- if *watchPattern != "" {
- if strings.Contains(*watchPattern, "/") {
- println("watch path pattern", *watchPattern)
- filterFunc = func(dir, fname string) bool {
- matched, err := filepath.Match(*watchPattern, dir+"/"+fname)
- if err != nil {
- fmt.Printf("error: %v", err)
- }
- return matched
- }
- } else {
- println("watch file pattern", *watchPattern)
- filterFunc = func(dir, fname string) bool {
- matched, err := filepath.Match(*watchPattern, fname)
- if err != nil {
- fmt.Printf("error: %v", err)
- }
- return matched
- }
- }
- }
-
- shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
- if filterFunc == nil {
- return true
- }
- if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
- return false
- }
- if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
- return true
- }
- if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
- return true
- }
- return false
- }
-
- watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "watch",
- PathPrefix: *watchTarget,
- SinceNs: time.Now().Add(-*watchStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if !shouldPrint(resp) {
- continue
- }
- fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
- }
-
- })
- if watchErr != nil {
- fmt.Printf("watch %s: %v\n", *watchFiler, watchErr)
- }
-
- return true
-}
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index da104358b..91b0bc98f 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -9,19 +9,33 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
+ "sync"
)
+type SqlGenerator interface {
+ GetSqlInsert(bucket string) string
+ GetSqlUpdate(bucket string) string
+ GetSqlFind(bucket string) string
+ GetSqlDelete(bucket string) string
+ GetSqlDeleteFolderChildren(bucket string) string
+ GetSqlListExclusive(bucket string) string
+ GetSqlListInclusive(bucket string) string
+ GetSqlCreateTable(bucket string) string
+ GetSqlDropTable(bucket string) string
+}
+
type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlDeleteFolderChildren string
- SqlListExclusive string
- SqlListInclusive string
+ SqlGenerator
+ DB *sql.DB
+ SupportBucketTable bool
+ dbs map[string]bool
+ dbsLock sync.Mutex
}
+const (
+ DEFAULT_TABLE = "filemeta"
+)
+
type TxOrDB interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
@@ -52,16 +66,65 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
+func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
+
+ shortPath = fullpath
+ bucket = DEFAULT_TABLE
+
if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx
+ txOrDB = tx
+ } else {
+ txOrDB = store.DB
+ }
+
+ if !store.SupportBucketTable {
+ return
+ }
+
+ if !strings.HasPrefix(string(fullpath), "/buckets/") {
+ return
+ }
+
+ // detect bucket
+ bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 && !isForChildren {
+ return
+ }
+ bucket = bucketAndObjectKey
+ shortPath = "/"
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = util.FullPath(bucketAndObjectKey[t:])
+ }
+
+ if isValidBucket(bucket) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if store.dbs == nil {
+ store.dbs = make(map[string]bool)
+ }
+
+ if _, found := store.dbs[bucket]; !found {
+ if err = store.CreateTable(ctx, bucket); err != nil {
+ store.dbs[bucket] = true
+ }
+ }
+
}
- return store.DB
+
+ return
}
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
- dir, name := entry.FullPath.DirAndName()
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@@ -71,7 +134,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
meta = util.MaybeGzipData(meta)
}
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
+ res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
if err == nil {
return
}
@@ -84,7 +147,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
// now the insert failed possibly due to duplication constraints
glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
- res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
}
@@ -99,13 +162,18 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
- dir, name := entry.FullPath.DirAndName()
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err)
}
@@ -119,8 +187,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
- dir, name := fullpath.DirAndName()
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
+ if err != nil {
+ return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
@@ -142,9 +215,14 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
- dir, name := fullpath.DirAndName()
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
+ res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err)
}
@@ -159,7 +237,23 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ if isValidBucket(bucket) && shortPath == "/" {
+ if err = store.deleteTable(ctx, bucket); err == nil {
+ store.dbsLock.Lock()
+ delete(store.dbs, bucket)
+ store.dbsLock.Unlock()
+ return nil
+ } else {
+ return err
+ }
+ }
+
+ res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath)
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
}
@@ -172,15 +266,21 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil
}
-func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- sqlText := store.SqlListExclusive
- if inclusive {
- sqlText = store.SqlListInclusive
+func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
+ if err != nil {
+ return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
+ }
+
+ sqlText := store.GetSqlListExclusive(bucket)
+ if includeStartFile {
+ sqlText = store.GetSqlListInclusive(bucket)
}
- rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix+"%", limit)
+ rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
defer rows.Close()
@@ -188,28 +288,52 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
var name string
var data []byte
if err = rows.Scan(&name, &data); err != nil {
- glog.V(0).Infof("scan %s : %v", fullpath, err)
- return nil, fmt.Errorf("scan %s: %v", fullpath, err)
+ glog.V(0).Infof("scan %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
}
+ lastFileName = name
entry := &filer.Entry{
- FullPath: util.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(dirPath), name),
}
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
- return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ }
+
+ if !eachEntryFunc(entry) {
+ break
}
- entries = append(entries, entry)
}
- return entries, nil
+ return lastFileName, nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
}
func (store *AbstractSqlStore) Shutdown() {
store.DB.Close()
}
+
+func isValidBucket(bucket string) bool {
+ return bucket != DEFAULT_TABLE && bucket != ""
+}
+
+func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
+ if !store.SupportBucketTable {
+ return nil
+ }
+ _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
+ return err
+}
+
+func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
+ if !store.SupportBucketTable {
+ return nil
+ }
+ _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
+ return err
+}
diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go
index 792a45ff4..03b016c76 100644
--- a/weed/filer/abstract_sql/abstract_sql_store_kv.go
+++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go
@@ -13,9 +13,14 @@ import (
func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ db, _, _, err := store.getTxOrDB(ctx, "", false)
+ if err != nil {
+ return fmt.Errorf("findDB: %v", err)
+ }
+
dirStr, dirHash, name := genDirAndName(key)
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
+ res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
if err == nil {
return
}
@@ -28,7 +33,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
// now the insert failed possibly due to duplication constraints
glog.V(1).Infof("kv insert falls back to update: %s", err)
- res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr)
+ res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
if err != nil {
return fmt.Errorf("kv upsert: %s", err)
}
@@ -43,8 +48,13 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ db, _, _, err := store.getTxOrDB(ctx, "", false)
+ if err != nil {
+ return nil, fmt.Errorf("findDB: %v", err)
+ }
+
dirStr, dirHash, name := genDirAndName(key)
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr)
+ row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
err = row.Scan(&value)
@@ -61,9 +71,14 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ db, _, _, err := store.getTxOrDB(ctx, "", false)
+ if err != nil {
+ return fmt.Errorf("findDB: %v", err)
+ }
+
dirStr, dirHash, name := genDirAndName(key)
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr)
+ res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
if err != nil {
return fmt.Errorf("kv delete: %s", err)
}
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index 49f5625d9..fd2ce91a6 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -168,41 +168,43 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
return nil
}
-func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {
return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
}
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
- if inclusive {
+ if includeStartFile {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
}
var data []byte
var name string
- iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
+ iter := store.session.Query(cqlStr, string(dirPath), startFileName, limit+1).Iter()
for iter.Scan(&name, &data) {
entry := &filer.Entry{
- FullPath: util.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(dirPath), name),
}
+ lastFileName = name
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
if err := iter.Close(); err != nil {
glog.V(0).Infof("list iterator close: %v", err)
}
- return entries, err
+ return lastFileName, err
}
func (store *CassandraStore) Shutdown() {
diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go
index a6f18709e..9ef2f3e0f 100644
--- a/weed/filer/configuration.go
+++ b/weed/filer/configuration.go
@@ -2,7 +2,7 @@ package filer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/util"
"os"
"reflect"
"strings"
@@ -12,7 +12,7 @@ var (
Stores []FilerStore
)
-func (f *Filer) LoadConfiguration(config *viper.Viper) {
+func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
validateOneEnabledStore(config)
@@ -79,7 +79,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
}
-func validateOneEnabledStore(config *viper.Viper) {
+func validateOneEnabledStore(config *util.ViperProxy) {
enabledStore := ""
for _, store := range Stores {
if config.GetBool(store.GetName() + ".enabled") {
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index ec88e10a5..1e7f55599 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
@@ -187,28 +187,28 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
- for _, entry := range entries {
- store.DeleteEntry(ctx, entry.FullPath)
+ _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
+ if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
+ glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
+ return false
}
- }
- return nil
+ return true
+ })
+ return
}
-func (store *ElasticStore) ListDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer.Entry, err error) {
- if string(fullpath) == "/" {
- return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
+func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ if string(dirPath) == "/" {
+ return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc)
}
- return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
+ return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
}
-func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
- return entries, err
+ return
}
for _, index := range indexResult {
if index.Index == indexKV {
@@ -218,6 +218,7 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
if entry, err := store.FindEntry(ctx,
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
fileName := getFileName(entry.FullPath)
+ lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
@@ -225,24 +226,25 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
if limit < 0 {
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
}
- return entries, nil
+ return
}
func (store *ElasticStore) listDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer.Entry, err error) {
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
first := true
index := getIndex(fullpath)
nextStart := ""
parentId := weed_util.Md5String([]byte(fullpath))
- if _, err := store.client.Refresh(index).Do(ctx); err != nil {
+ if _, err = store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx)
- return entries, nil
+ return
}
}
for {
@@ -250,7 +252,7 @@ func (store *ElasticStore) listDirectoryEntries(
if (startFileName == "" && first) || inclusive {
if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, err
+ return
}
} else {
fullPath := string(fullpath) + "/" + startFileName
@@ -260,7 +262,7 @@ func (store *ElasticStore) listDirectoryEntries(
after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, err
+ return
}
}
first = false
@@ -272,21 +274,21 @@ func (store *ElasticStore) listDirectoryEntries(
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
limit--
if limit < 0 {
- return entries, nil
+ return lastFileName, nil
}
nextStart = string(esEntry.Entry.FullPath)
fileName := getFileName(esEntry.Entry.FullPath)
+ lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
- entries = append(entries, esEntry.Entry)
+ if !eachEntryFunc(esEntry.Entry) {
+ break
+ }
}
}
- if len(result.Hits.Hits) < store.maxPageSize {
- break
- }
}
- return entries, nil
+ return
}
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 634fba1eb..8159c634d 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -101,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa
resp, err := store.client.Get(ctx, string(key))
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
if len(resp.Kvs) == 0 {
@@ -139,17 +139,17 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil
}
-func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, "")
resp, err := store.client.Get(ctx, string(directoryPrefix),
clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
for _, kv := range resp.Kvs {
@@ -157,25 +157,28 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
if fileName == "" {
continue
}
- if fileName == startFileName && !inclusive {
+ if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
- return entries, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index f5ab36d37..845bfaec1 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
"time"
@@ -38,7 +39,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
@@ -63,7 +64,7 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
return
}
-func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest {
return
}
@@ -84,7 +85,7 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
}
// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index c75a35f79..68f308a51 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"sort"
"sync"
@@ -52,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
}
-func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@@ -71,7 +72,7 @@ func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_
return
}
-func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
+func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
if aErr != nil {
@@ -116,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
}
-func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
+func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@@ -222,7 +223,7 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 13dedea1e..e59887763 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
- dirParts := strings.Split(string(entry.FullPath), "/")
-
- // fmt.Printf("directory parts: %+v\n", dirParts)
-
- var lastDirectoryEntry *Entry
-
- for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + util.Join(dirParts[:i]...)
- // fmt.Printf("%d directory: %+v\n", i, dirPath)
-
- // check the store directly
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
-
- // no such existing directory
- if dirEntry == nil {
-
- // create the directory
- now := time.Now()
-
- dirEntry = &Entry{
- FullPath: util.FullPath(dirPath),
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0110,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
- },
- }
-
- glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
- if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
- glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
- return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
- }
- } else {
- f.maybeAddBucket(dirEntry)
- f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
- }
-
- } else if !dirEntry.IsDirectory() {
- glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
- return fmt.Errorf("%s is a file", dirPath)
- }
-
- // remember the direct parent directory entry
- if i == len(dirParts)-1 {
- lastDirectoryEntry = dirEntry
- }
-
- }
-
- if lastDirectoryEntry == nil {
- glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
- return fmt.Errorf("parent folder not found: %v", entry.FullPath)
- }
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
@@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
*/
- oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
-
if oldEntry == nil {
+
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
@@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
+func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
+
+ if level == 0 {
+ return nil
+ }
+
+ dirPath := "/" + util.Join(dirParts[:level]...)
+ // fmt.Printf("%d directory: %+v\n", i, dirPath)
+
+ // check the store directly
+ glog.V(4).Infof("find uncached directory: %s", dirPath)
+ dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
+
+ // no such existing directory
+ if dirEntry == nil {
+
+ // ensure parent directory
+ if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
+ // create the directory
+ now := time.Now()
+
+ dirEntry = &Entry{
+ FullPath: util.FullPath(dirPath),
+ Attr: Attr{
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
+ },
+ }
+
+ glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
+ mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
+ if mkdirErr != nil {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
+ return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
+ }
+ } else {
+ f.maybeAddBucket(dirEntry)
+ f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
+ }
+
+ } else if !dirEntry.IsDirectory() {
+ glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
+ return fmt.Errorf("%s is a file", dirPath)
+ }
+
+ return nil
+}
+
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
@@ -280,21 +281,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
-func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
- listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
- if listErr != nil {
- return listedEntries, expiredCount, "", listErr
- }
- for _, entry := range listedEntries {
- lastFileName = entry.Name()
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
+ lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
expiredCount++
- continue
+ return true
}
}
- entries = append(entries, entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return expiredCount, lastFileName, err
}
return
}
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
index 4d4f4abc3..ba170f02e 100644
--- a/weed/filer/filer_buckets.go
+++ b/weed/filer/filer_buckets.go
@@ -27,9 +27,9 @@ func (f *Filer) LoadBuckets() {
buckets: make(map[BucketName]*BucketOption),
}
- limit := math.MaxInt32
+ limit := int64(math.MaxInt32)
- entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")
+ entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "")
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index da92c4f4b..50a669f40 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -30,7 +30,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
var dirHardLinkIds []HardLinkId
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isFromOtherCluster, signatures)
+ dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures)
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
@@ -63,46 +63,49 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return nil
}
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
lastFileName := ""
includeLastFile := false
- for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
- if err != nil {
- glog.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
- }
- if lastFileName == "" && !isRecursive && len(entries) > 0 {
- // only for first iteration in the loop
- glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
- }
+ if !isDeletingBucket {
+ for {
+ entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
+ return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
- for _, sub := range entries {
- lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- var dirHardLinkIds []HardLinkId
- if sub.IsDirectory() {
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil)
- chunks = append(chunks, dirChunks...)
- hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
- } else {
- f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
- if len(sub.HardLinkId) != 0 {
- // hard link chunk data are deleted separately
- hardlinkIds = append(hardlinkIds, sub.HardLinkId)
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ var dirHardLinkIds []HardLinkId
+ if sub.IsDirectory() {
+ subIsDeletingBucket := f.isBucket(sub)
+ dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil)
+ chunks = append(chunks, dirChunks...)
+ hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
} else {
- chunks = append(chunks, sub.Chunks...)
+ f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
+ if len(sub.HardLinkId) != 0 {
+ // hard link chunk data are deleted separately
+ hardlinkIds = append(hardlinkIds, sub.HardLinkId)
+ } else {
+ chunks = append(chunks, sub.Chunks...)
+ }
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, nil, err
}
}
- if err != nil && !ignoreRecursiveError {
- return nil, nil, err
- }
- }
- if len(entries) < PaginationSize {
- break
+ if len(entries) < PaginationSize {
+ break
+ }
}
}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 40755e6a7..f3a795ad0 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -113,13 +113,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "")
+ dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "")
if listDayErr != nil {
return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "")
+ hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
@@ -170,7 +170,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func
return lastTsNs, err
}
if logEntry.TsNs <= ns {
- return lastTsNs, nil
+ continue
}
// println("each log: ", logEntry.TsNs)
if err := eachLogEntryFn(logEntry); err != nil {
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index b26959cb0..0a14d3756 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -19,58 +19,72 @@ func splitPattern(pattern string) (prefix string, restPattern string) {
return "", restPattern
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) {
+// For now, prefix and namePattern are mutually exclusive
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string) (entries []*Entry, hasMore bool, err error) {
+
+ _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, func(entry *Entry) bool {
+ entries = append(entries, entry)
+ return true
+ })
+
+ hasMore = int64(len(entries)) >= limit+1
+ if hasMore {
+ entries = entries[:limit]
+ }
+
+ return entries, hasMore, err
+}
+
+// For now, prefix and namePattern are mutually exclusive
+func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- prefix, restNamePattern := splitPattern(namePattern)
- var missedCount int
- var lastFileName string
+ prefixInNamePattern, restNamePattern := splitPattern(namePattern)
+ if prefixInNamePattern != "" {
+ prefix = prefixInNamePattern
+ }
+ var missedCount int64
- entries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern)
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, eachEntryFunc)
for missedCount > 0 && err == nil {
- var makeupEntries []*Entry
- makeupEntries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern)
- for _, entry := range makeupEntries {
- entries = append(entries, entry)
- }
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, eachEntryFunc)
}
- return entries, err
+ return
}
-func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix, restNamePattern string) (matchedEntries []*Entry, missedCount int, lastFileName string, err error) {
- var foundEntries []*Entry
+func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
- foundEntries, lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix)
- if err != nil {
- return
- }
if len(restNamePattern) == 0 {
- return foundEntries, 0, lastFileName, nil
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
+ return 0, lastFileName, err
}
- for _, entry := range foundEntries {
+
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
nameToTest := strings.ToLower(entry.Name())
if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched {
- matchedEntries = append(matchedEntries, entry)
+ if !eachEntryFunc(entry) {
+ return false
+ }
} else {
missedCount++
}
+ return true
+ })
+ if err != nil {
+ return
}
return
}
-func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, lastFileName string, err error) {
- var makeupEntries []*Entry
- var expiredCount int
- entries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
+func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
+ var expiredCount int64
+ expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
for expiredCount > 0 && err == nil {
- makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
- if err == nil {
- entries = append(entries, makeupEntries...)
- }
+ expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix, eachEntryFunc)
}
return
}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index f1e6c6c35..8955a25c7 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -13,6 +13,8 @@ var (
ErrKvNotFound = errors.New("kv: not found")
)
+type ListEachEntryFunc func(entry *Entry) bool
+
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
@@ -24,8 +26,8 @@ type FilerStore interface {
FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, util.FullPath) (err error)
DeleteFolderChildren(context.Context, util.FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
- ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
+ ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
+ ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
index ea0f9db77..00bf82ed4 100644
--- a/weed/filer/filerstore_translate_path.go
+++ b/weed/filer/filerstore_translate_path.go
@@ -106,32 +106,24 @@ func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp u
return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
}
-func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
- entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
- }
- return entries, err
+ return eachEntryFunc(entry)
+ })
}
-func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
- entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
- }
- return entries, nil
+ return eachEntryFunc(entry)
+ })
}
func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 3206d5ba4..64baac371 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -194,7 +194,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
return actualStore.DeleteFolderChildren(ctx, fp)
}
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
start := time.Now()
@@ -203,18 +203,14 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}()
glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
- entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, err
+ return eachEntryFunc(entry)
+ })
}
-func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
start := time.Now()
@@ -222,48 +218,52 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
if err == ErrUnsupportedListDirectoryPrefixed {
- entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return eachEntryFunc(entry)
+ })
}
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, nil
+ return lastFileName, err
}
-func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
+func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
- entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
if prefix == "" {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
+ }
+
+ var notPrefixed []*Entry
+ lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
+ notPrefixed = append(notPrefixed, entry)
+ return true
+ })
+ if err != nil {
return
}
- count := 0
- var lastFileName string
- notPrefixed := entries
- entries = nil
+ count := int64(0)
for count < limit && len(notPrefixed) > 0 {
for _, entry := range notPrefixed {
- lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ return
+ }
if count >= limit {
break
}
}
}
if count < limit {
- notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
+ notPrefixed = notPrefixed[:0]
+ _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
+ notPrefixed = append(notPrefixed, entry)
+ return true
+ })
if err != nil {
return
}
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index 6b0ad58b9..2e4491515 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -148,19 +148,18 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful
return
}
-func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) {
+func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
- return nil, err
+ return lastFileName, err
}
- var entries []*filer.Entry
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
@@ -169,7 +168,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
break
}
if err != nil {
- return entries, err
+ return lastFileName, err
}
if len(res.Cells) == 0 {
continue
@@ -186,6 +185,8 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
continue
}
+ lastFileName = fileName
+
value := cell.Value
if fileName == startFileName && !includeStartFile {
@@ -204,10 +205,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
- return entries, nil
+ return lastFileName, nil
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go
index 26bf763e2..990e55a24 100644
--- a/weed/filer/hbase/hbase_store_kv.go
+++ b/weed/filer/hbase/hbase_store_kv.go
@@ -7,9 +7,10 @@ import (
"time"
)
-const(
+const (
COLUMN_NAME = "a"
)
+
func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return store.doPut(ctx, store.cfKv, key, value, 0)
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index b879f3a6e..f0ae64769 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -107,7 +107,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
return nil, filer_pb.ErrNotFound
}
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
@@ -162,16 +162,19 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
- iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
key := iter.Key()
if !bytes.HasPrefix(key, directoryPrefix) {
@@ -181,26 +184,29 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
if fileName == "" {
continue
}
- if fileName == startFileName && !inclusive {
+ if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index c5bfb8474..9c342605e 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -2,9 +2,11 @@ package leveldb
import (
"context"
+ "fmt"
"io/ioutil"
"os"
"testing"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -49,14 +51,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
@@ -86,3 +88,28 @@ func TestEmptyRoot(t *testing.T) {
}
}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &LevelDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 4b41554b9..965721460 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -115,7 +115,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu
return nil, filer_pb.ErrNotFound
}
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
@@ -171,15 +171,17 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w
return nil
}
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, prefix, store.dbCount)
- lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart, _ = genDirectoryKeyPrefix(dirPath, startFileName, store.dbCount)
+ }
iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
@@ -191,15 +193,16 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fu
if fileName == "" {
continue
}
- if fileName == startFileName && !inclusive {
+ if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
@@ -208,11 +211,13 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fu
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index 22c0d6052..495c73fdd 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
new file mode 100644
index 000000000..24e00edc7
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -0,0 +1,375 @@
+package leveldb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "github.com/syndtr/goleveldb/leveldb"
+ leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+ "io"
+ "os"
+ "strings"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DEFAULT = "_main"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &LevelDB3Store{})
+}
+
+type LevelDB3Store struct {
+ dir string
+ dbs map[string]*leveldb.DB
+ dbsLock sync.RWMutex
+}
+
+func (store *LevelDB3Store) GetName() string {
+ return "leveldb3"
+}
+
+func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *LevelDB3Store) initialize(dir string) (err error) {
+ glog.Infof("filer store leveldb3 dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+ store.dir = dir
+
+ db, loadDbErr := store.loadDB(DEFAULT)
+ if loadDbErr != nil {
+ return loadDbErr
+ }
+ store.dbs = make(map[string]*leveldb.DB)
+ store.dbs[DEFAULT] = db
+
+ return
+}
+
+func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+ if name != DEFAULT {
+ opts = &opt.Options{
+ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+ }
+
+ dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
+ os.MkdirAll(dbFolder, 0755)
+ db, dbErr := leveldb.OpenFile(dbFolder, opts)
+ if leveldb_errors.IsCorrupted(dbErr) {
+ db, dbErr = leveldb.RecoverFile(dbFolder, opts)
+ }
+ if dbErr != nil {
+ glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
+ return nil, dbErr
+ }
+ return db, nil
+}
+
+func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
+
+ store.dbsLock.RLock()
+
+ defaultDB := store.dbs[DEFAULT]
+ if !strings.HasPrefix(string(fullpath), "/buckets/") {
+ store.dbsLock.RUnlock()
+ return defaultDB, DEFAULT, fullpath, nil
+ }
+
+ // detect bucket
+ bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 && !isForChildren {
+ store.dbsLock.RUnlock()
+ return defaultDB, DEFAULT, fullpath, nil
+ }
+ bucket := bucketAndObjectKey
+ shortPath := weed_util.FullPath("/")
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
+ }
+
+ if db, found := store.dbs[bucket]; found {
+ store.dbsLock.RUnlock()
+ return db, bucket, shortPath, nil
+ }
+
+ store.dbsLock.RUnlock()
+ // upgrade to write lock
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ // double check after getting the write lock
+ if db, found := store.dbs[bucket]; found {
+ return db, bucket, shortPath, nil
+ }
+
+ // create db
+ db, err := store.loadDB(bucket)
+ if err != nil {
+ return nil, bucket, shortPath, err
+ }
+ store.dbs[bucket] = db
+
+ return db, bucket, shortPath, nil
+}
+
+func (store *LevelDB3Store) closeDB(bucket string) {
+
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if db, found := store.dbs[bucket]; found {
+ db.Close()
+ delete(store.dbs, bucket)
+ }
+
+}
+
+func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ db, _, shortPath, err := store.findDB(entry.FullPath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ value = weed_util.MaybeGzipData(value)
+ }
+
+ err = db.Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+
+ db, _, shortPath, err := store.findDB(fullpath, false)
+ if err != nil {
+ return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ key := genKey(dir, name)
+
+ data, err := db.Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+
+ db, _, shortPath, err := store.findDB(fullpath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ key := genKey(dir, name)
+
+ err = db.Delete(key, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+
+ db, bucket, shortPath, err := store.findDB(fullpath, true)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ if bucket != DEFAULT && shortPath == "/" {
+ store.closeDB(bucket)
+ if bucket != "" { // just to make sure
+ os.RemoveAll(store.dir + "/" + bucket)
+ }
+ return nil
+ }
+
+ directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
+
+ batch := new(leveldb.Batch)
+
+ iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete(append(directoryPrefix, []byte(fileName)...))
+ }
+ iter.Release()
+
+ err = db.Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ db, _, shortPath, err := store.findDB(dirPath, true)
+ if err != nil {
+ return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
+ }
+
+ directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
+ }
+
+ iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !includeStartFile {
+ continue
+ }
+ lastFileName = fileName
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
+ }
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ iter.Release()
+
+ return lastFileName, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return b
+}
+
+func (store *LevelDB3Store) Shutdown() {
+ for _, db := range store.dbs {
+ db.Close()
+ }
+}
diff --git a/weed/filer/leveldb3/leveldb3_store_kv.go b/weed/filer/leveldb3/leveldb3_store_kv.go
new file mode 100644
index 000000000..18d782b80
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store_kv.go
@@ -0,0 +1,46 @@
+package leveldb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+func (store *LevelDB3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ err = store.dbs[DEFAULT].Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ value, err = store.dbs[DEFAULT].Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer.ErrKvNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get: %v", err)
+ }
+
+ return
+}
+
+func (store *LevelDB3Store) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ err = store.dbs[DEFAULT].Delete(key, nil)
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go
new file mode 100644
index 000000000..53b0e927f
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store_test.go
@@ -0,0 +1,88 @@
+package leveldb
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &LevelDB3Store{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := testFiler.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &LevelDB3Store{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index d20c6477a..1ef5056f4 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -178,14 +178,14 @@ func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath ut
return nil
}
-func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}}
- if inclusive {
+ var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}}
+ if includeStartFile {
where["name"] = bson.M{
"$gte": startFileName,
}
@@ -197,26 +197,30 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut
var data Model
err := cur.Decode(&data)
if err != nil && err != mongo.ErrNoDocuments {
- return nil, err
+ return lastFileName, err
}
entry := &filer.Entry{
- FullPath: util.NewFullPath(string(fullpath), data.Name),
+ FullPath: util.NewFullPath(string(dirPath), data.Name),
}
+ lastFileName = data.Name
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
+
}
if err := cur.Close(ctx); err != nil {
glog.V(0).Infof("list iterator close: %v", err)
}
- return entries, err
+ return lastFileName, err
}
func (store *MongodbStore) Shutdown() {
diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go
new file mode 100644
index 000000000..057484c37
--- /dev/null
+++ b/weed/filer/mysql/mysql_sql_gen.go
@@ -0,0 +1,52 @@
+package mysql
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ _ "github.com/go-sql-driver/mysql"
+)
+
+type SqlGenMysql struct {
+ CreateTableSqlTemplate string
+ DropTableSqlTemplate string
+}
+
+var (
+ _ = abstract_sql.SqlGenerator(&SqlGenMysql{})
+)
+
+func (gen *SqlGenMysql) GetSqlInsert(bucket string) string {
+ return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string {
+ return fmt.Sprintf("UPDATE %s SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlFind(bucket string) string {
+ return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlDelete(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string {
+ return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlDropTable(bucket string) string {
+ return fmt.Sprintf(gen.DropTableSqlTemplate, bucket)
+}
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
index 5bc132980..686628740 100644
--- a/weed/filer/mysql/mysql_store.go
+++ b/weed/filer/mysql/mysql_store.go
@@ -3,8 +3,9 @@ package mysql
import (
"database/sql"
"fmt"
-
"github.com/chrislusf/seaweedfs/weed/filer"
+ "time"
+
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/util"
_ "github.com/go-sql-driver/mysql"
@@ -35,20 +36,19 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
configuration.GetString(prefix+"database"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
configuration.GetBool(prefix+"interpolateParams"),
)
}
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
- interpolateParams bool) (err error) {
- //
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
- store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
+func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen,
+ maxLifetimeSeconds int, interpolateParams bool) (err error) {
+
+ store.SupportBucketTable = false
+ store.SqlGenerator = &SqlGenMysql{
+ CreateTableSqlTemplate: "",
+ DropTableSqlTemplate: "drop table %s",
+ }
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
if interpolateParams {
@@ -65,6 +65,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
+ store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
new file mode 100644
index 000000000..15216b651
--- /dev/null
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -0,0 +1,82 @@
+package mysql2
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+ CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &MysqlStore2{})
+}
+
+type MysqlStore2 struct {
+ abstract_sql.AbstractSqlStore
+}
+
+func (store *MysqlStore2) GetName() string {
+ return "mysql2"
+}
+
+func (store *MysqlStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"createTable"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
+ configuration.GetBool(prefix+"interpolateParams"),
+ )
+}
+
+func (store *MysqlStore2) initialize(createTable, user, password, hostname string, port int, database string, maxIdle, maxOpen,
+ maxLifetimeSeconds int, interpolateParams bool) (err error) {
+
+ store.SupportBucketTable = true
+ store.SqlGenerator = &mysql.SqlGenMysql{
+ CreateTableSqlTemplate: createTable,
+ DropTableSqlTemplate: "drop table %s",
+ }
+
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ if interpolateParams {
+ sqlUrl += "&interpolateParams=true"
+ }
+
+ var dbErr error
+ store.DB, dbErr = sql.Open("mysql", sqlUrl)
+ if dbErr != nil {
+ store.DB.Close()
+ store.DB = nil
+ return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ }
+
+ store.DB.SetMaxIdleConns(maxIdle)
+ store.DB.SetMaxOpenConns(maxOpen)
+ store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
+
+ if err = store.DB.Ping(); err != nil {
+ return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ }
+
+ if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
+ return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go
new file mode 100644
index 000000000..284cf254b
--- /dev/null
+++ b/weed/filer/postgres/postgres_sql_gen.go
@@ -0,0 +1,53 @@
+package postgres
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ _ "github.com/lib/pq"
+)
+
+type SqlGenPostgres struct {
+ CreateTableSqlTemplate string
+ DropTableSqlTemplate string
+}
+
+var (
+ _ = abstract_sql.SqlGenerator(&SqlGenPostgres{})
+)
+
+func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string {
+ return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string {
+ return fmt.Sprintf("UPDATE %s SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlFind(bucket string) string {
+ return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND directory=$2", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string {
+ return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlDropTable(bucket string) string {
+ return fmt.Sprintf(gen.DropTableSqlTemplate, bucket)
+}
diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go
index 2325568fe..27c6278c7 100644
--- a/weed/filer/postgres/postgres_store.go
+++ b/weed/filer/postgres/postgres_store.go
@@ -33,21 +33,20 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"hostname"),
configuration.GetInt(prefix+"port"),
configuration.GetString(prefix+"database"),
+ configuration.GetString(prefix+"schema"),
configuration.GetString(prefix+"sslmode"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
)
}
-func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) {
+func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
- store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
+ store.SupportBucketTable = false
+ store.SqlGenerator = &SqlGenPostgres{
+ CreateTableSqlTemplate: "",
+ DropTableSqlTemplate: "drop table %s",
+ }
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" {
@@ -59,6 +58,9 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
if database != "" {
sqlUrl += " dbname=" + database
}
+ if schema != "" {
+ sqlUrl += " search_path=" + schema
+ }
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go
new file mode 100644
index 000000000..82552376f
--- /dev/null
+++ b/weed/filer/postgres2/postgres2_store.go
@@ -0,0 +1,87 @@
+package postgres2
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ _ "github.com/lib/pq"
+)
+
+const (
+ CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &PostgresStore2{})
+}
+
+type PostgresStore2 struct {
+ abstract_sql.AbstractSqlStore
+}
+
+func (store *PostgresStore2) GetName() string {
+ return "postgres2"
+}
+
+func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"createTable"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetString(prefix+"schema"),
+ configuration.GetString(prefix+"sslmode"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ )
+}
+
+func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
+
+ store.SupportBucketTable = true
+ store.SqlGenerator = &postgres.SqlGenPostgres{
+ CreateTableSqlTemplate: createTable,
+ DropTableSqlTemplate: "drop table %s",
+ }
+
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
+ if user != "" {
+ sqlUrl += " user=" + user
+ }
+ if password != "" {
+ sqlUrl += " password=" + password
+ }
+ if database != "" {
+ sqlUrl += " dbname=" + database
+ }
+ if schema != "" {
+ sqlUrl += " search_path=" + schema
+ }
+ var dbErr error
+ store.DB, dbErr = sql.Open("postgres", sqlUrl)
+ if dbErr != nil {
+ store.DB.Close()
+ store.DB = nil
+ return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ }
+
+ store.DB.SetMaxIdleConns(maxIdle)
+ store.DB.SetMaxOpenConns(maxOpen)
+
+ if err = store.DB.Ping(); err != nil {
+ return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ }
+
+ if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
+ return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 6193dbd45..307224f35 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -18,7 +18,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
- lookupFileId LookupFileIdFunctionType
+ lookupFileId wdclient.LookupFileIdFunctionType
readerLock sync.Mutex
fileSize int64
@@ -31,9 +31,7 @@ type ChunkReadAt struct {
var _ = io.ReaderAt(&ChunkReadAt{})
var _ = io.Closer(&ChunkReadAt{})
-type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
-
-func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
+func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex
@@ -109,7 +107,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
defer c.readerLock.Unlock()
glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
- return c.doReadAt(p[n:], offset+int64(n))
+ return c.doReadAt(p, offset)
}
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
diff --git a/weed/filer/redis/redis_cluster_store.go b/weed/filer/redis/redis_cluster_store.go
index 8af94ee55..9572058a8 100644
--- a/weed/filer/redis/redis_cluster_store.go
+++ b/weed/filer/redis/redis_cluster_store.go
@@ -3,7 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
@@ -20,8 +20,8 @@ func (store *RedisClusterStore) GetName() string {
func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- configuration.SetDefault(prefix+"useReadOnly", true)
- configuration.SetDefault(prefix+"routeByLatency", true)
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),
diff --git a/weed/filer/redis/redis_store.go b/weed/filer/redis/redis_store.go
index e152457ed..665352a63 100644
--- a/weed/filer/redis/redis_store.go
+++ b/weed/filer/redis/redis_store.go
@@ -3,7 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 0de9924a3..30d11a7f4 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -44,7 +44,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
value = util.MaybeGzipData(value)
}
- _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
+ _, err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
@@ -52,7 +52,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
dir, name := entry.FullPath.DirAndName()
if name != "" {
- _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
@@ -68,7 +68,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
- data, err := store.Client.Get(string(fullpath)).Result()
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@@ -90,7 +90,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
- _, err = store.Client.Del(string(fullpath)).Result()
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
@@ -98,7 +98,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
dir, name := fullpath.DirAndName()
if name != "" {
- _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.SRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@@ -109,14 +109,14 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete folder %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
+ _, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@@ -125,17 +125,16 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil
}
-func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- dirListKey := genDirectoryListKey(string(fullpath))
- members, err := store.Client.SMembers(dirListKey).Result()
+ dirListKey := genDirectoryListKey(string(dirPath))
+ members, err := store.Client.SMembers(ctx, dirListKey).Result()
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// skip
@@ -144,7 +143,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
for _, m := range members {
if strings.Compare(m, startFileName) >= 0 {
if m == startFileName {
- if inclusive {
+ if includeStartFile {
t = append(t, m)
}
} else {
@@ -161,29 +160,35 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
})
// limit
- if limit < len(members) {
+ if limit < int64(len(members)) {
members = members[:limit]
}
// fetch entry meta
for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.SRem(dirListKey, fileName).Result()
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.SRem(ctx, dirListKey, fileName).Result()
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
diff --git a/weed/filer/redis/universal_redis_store_kv.go b/weed/filer/redis/universal_redis_store_kv.go
index 0fc12c631..ad6e389ed 100644
--- a/weed/filer/redis/universal_redis_store_kv.go
+++ b/weed/filer/redis/universal_redis_store_kv.go
@@ -5,12 +5,12 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- _, err = store.Client.Set(string(key), value, 0).Result()
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
@@ -21,7 +21,7 @@ func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value [
func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- data, err := store.Client.Get(string(key)).Result()
+ data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
@@ -32,7 +32,7 @@ func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value
func (store *UniversalRedisStore) KvDelete(ctx context.Context, key []byte) (err error) {
- _, err = store.Client.Del(string(key)).Result()
+ _, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go
index c7742bb19..22d09da25 100644
--- a/weed/filer/redis2/redis_cluster_store.go
+++ b/weed/filer/redis2/redis_cluster_store.go
@@ -3,7 +3,7 @@ package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
@@ -20,8 +20,8 @@ func (store *RedisCluster2Store) GetName() string {
func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
- configuration.SetDefault(prefix+"useReadOnly", true)
- configuration.SetDefault(prefix+"routeByLatency", true)
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
index da404ed4c..8eb97e374 100644
--- a/weed/filer/redis2/redis_store.go
+++ b/weed/filer/redis2/redis_store.go
@@ -3,7 +3,7 @@ package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 00d02ea14..aab3d1f4a 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -5,7 +5,7 @@ import (
"fmt"
"time"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -56,7 +56,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
value = util.MaybeGzipData(value)
}
- if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@@ -66,7 +66,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
if name != "" {
- if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
+ if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
@@ -81,7 +81,7 @@ func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
- data, err := store.Client.Get(string(fullpath)).Result()
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@@ -103,12 +103,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
- _, err = store.Client.Del(genDirectoryListKey(string(fullpath))).Result()
+ _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
- _, err = store.Client.Del(string(fullpath)).Result()
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@@ -118,7 +118,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
return nil
}
if name != "" {
- _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
@@ -133,14 +133,14 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
- members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
+ members, err := store.Client.ZRange(ctx, genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
+ _, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
@@ -149,45 +149,50 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
-func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- dirListKey := genDirectoryListKey(string(fullpath))
+ dirListKey := genDirectoryListKey(string(dirPath))
start := int64(0)
if startFileName != "" {
- start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
- if !inclusive {
+ start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result()
+ if !includeStartFile {
start++
}
}
- members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
+ members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result()
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.ZRem(dirListKey, fileName).Result()
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.ZRem(ctx, dirListKey, fileName).Result()
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
diff --git a/weed/filer/redis2/universal_redis_store_kv.go b/weed/filer/redis2/universal_redis_store_kv.go
index 658491ddf..bde994dc9 100644
--- a/weed/filer/redis2/universal_redis_store_kv.go
+++ b/weed/filer/redis2/universal_redis_store_kv.go
@@ -5,12 +5,12 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- _, err = store.Client.Set(string(key), value, 0).Result()
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
@@ -21,7 +21,7 @@ func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value
func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- data, err := store.Client.Get(string(key)).Result()
+ data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
@@ -32,7 +32,7 @@ func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value
func (store *UniversalRedis2Store) KvDelete(ctx context.Context, key []byte) (err error) {
- _, err = store.Client.Del(string(key)).Result()
+ _, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
diff --git a/weed/filer/rocksdb/README.md b/weed/filer/rocksdb/README.md
new file mode 100644
index 000000000..6bae6d34e
--- /dev/null
+++ b/weed/filer/rocksdb/README.md
@@ -0,0 +1,41 @@
+# Prepare the compilation environment on linux
+- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
+- sudo apt-get update -qq
+- sudo apt-get install gcc-6 g++-6 libsnappy-dev zlib1g-dev libbz2-dev -qq
+- export CXX="g++-6" CC="gcc-6"
+
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags2_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags2_2.0-1.1ubuntu1_amd64.deb
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+
+# Prepare the compilation environment on mac os
+```
+brew install snappy
+```
+
+# install rocksdb:
+```
+ export ROCKSDB_HOME=/Users/chris/dev/rocksdb
+
+ git clone https://github.com/facebook/rocksdb.git $ROCKSDB_HOME
+ pushd $ROCKSDB_HOME
+ make clean
+ make install-static
+ popd
+```
+
+# install gorocksdb
+
+```
+export CGO_CFLAGS="-I$ROCKSDB_HOME/include"
+export CGO_LDFLAGS="-L$ROCKSDB_HOME -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
+
+go get github.com/tecbot/gorocksdb
+```
+# compile with rocksdb
+
+```
+cd ~/go/src/github.com/chrislusf/seaweedfs/weed
+go install -tags rocksdb
+```
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
new file mode 100644
index 000000000..70c301725
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -0,0 +1,302 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RocksDBStore{})
+}
+
+type options struct {
+ opt *gorocksdb.Options
+ ro *gorocksdb.ReadOptions
+ wo *gorocksdb.WriteOptions
+}
+
+func (opt *options) init() {
+ opt.opt = gorocksdb.NewDefaultOptions()
+ opt.ro = gorocksdb.NewDefaultReadOptions()
+ opt.wo = gorocksdb.NewDefaultWriteOptions()
+}
+
+func (opt *options) close() {
+ opt.opt.Destroy()
+ opt.ro.Destroy()
+ opt.wo.Destroy()
+}
+
+type RocksDBStore struct {
+ path string
+ db *gorocksdb.DB
+ options
+}
+
+func (store *RocksDBStore) GetName() string {
+ return "rocksdb"
+}
+
+func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *RocksDBStore) initialize(dir string) (err error) {
+ glog.Infof("filer store rocksdb dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+ store.options.init()
+ store.opt.SetCreateIfMissing(true)
+ // reduce write amplification
+ // also avoid expired data stored in highest level never get compacted
+ store.opt.SetLevelCompactionDynamicLevelBytes(true)
+ store.opt.SetCompactionFilter(NewTTLFilter())
+ // store.opt.SetMaxBackgroundCompactions(2)
+
+ store.db, err = gorocksdb.OpenDb(store.opt, dir)
+
+ return
+}
+
+func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *RocksDBStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.db.Put(store.wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+ data, err := store.db.Get(store.ro, key)
+
+ if data == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+ defer data.Free()
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data.Data())
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ err = store.db.Delete(store.wo, key)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ batch := gorocksdb.NewWriteBatch()
+ defer batch.Destroy()
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
+ ro.SetFillCache(false)
+
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
+ batch.Delete(key)
+ return true
+ })
+ if err != nil {
+ return fmt.Errorf("delete list %s : %v", fullpath, err)
+ }
+
+ err = store.db.Write(store.wo, batch)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (err error) {
+
+ if len(lastKey) == 0 {
+ iter.Seek(prefix)
+ } else {
+ iter.Seek(lastKey)
+ if !includeLastKey {
+ if iter.Valid() {
+ if bytes.Equal(iter.Key().Data(), lastKey) {
+ iter.Next()
+ }
+ }
+ }
+ }
+
+ i := int64(0)
+ for ; iter.Valid(); iter.Next() {
+
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+
+ key := iter.Key().Data()
+
+ if !bytes.HasPrefix(key, prefix) {
+ break
+ }
+
+ ret := fn(key, iter.Value().Data())
+
+ if !ret {
+ break
+ }
+
+ }
+
+ if err := iter.Err(); err != nil {
+ return fmt.Errorf("prefix scan iterator: %v", err)
+ }
+ return nil
+}
+
+func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
+ ro.SetFillCache(false)
+
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ return true
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
+ }
+ lastFileName = fileName
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return false
+ }
+ if !eachEntryFunc(entry) {
+ return false
+ }
+ return true
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+
+ return lastFileName, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory, and use last byte for partitioning
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ return b
+}
+
+func (store *RocksDBStore) Shutdown() {
+ store.db.Close()
+ store.options.close()
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_kv.go b/weed/filer/rocksdb/rocksdb_store_kv.go
new file mode 100644
index 000000000..cf1214d5b
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_kv.go
@@ -0,0 +1,47 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ err = store.db.Put(store.wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ value, err = store.db.GetBytes(store.ro, key)
+
+ if value == nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get: %v", err)
+ }
+
+ return
+}
+
+func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ err = store.db.Delete(store.wo, key)
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
new file mode 100644
index 000000000..439663524
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -0,0 +1,117 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := testFiler.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go
new file mode 100644
index 000000000..faed22310
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_ttl.go
@@ -0,0 +1,40 @@
+//+build rocksdb
+
+package rocksdb
+
+import (
+ "time"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+type TTLFilter struct {
+ skipLevel0 bool
+}
+
+func NewTTLFilter() gorocksdb.CompactionFilter {
+ return &TTLFilter{
+ skipLevel0: true,
+ }
+}
+
+func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) {
+ // decode could be slow, causing write stall
+ // level >0 sst can run compaction in parallel
+ if !t.skipLevel0 || level > 0 {
+ entry := filer.Entry{}
+ if err := entry.DecodeAttributesAndChunks(val); err == nil {
+ if entry.TtlSec > 0 &&
+ entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
+ return true, nil
+ }
+ }
+ }
+ return false, val
+}
+
+func (t *TTLFilter) Name() string {
+ return "TTLFilter"
+}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index cffdc8303..f0042a0ff 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -13,16 +13,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
- chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
+ chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
- urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
+ urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
@@ -86,7 +86,7 @@ type ChunkStreamReader struct {
bufferOffset int64
bufferPos int
chunkIndex int
- lookupFileId LookupFileIdFunctionType
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index a8481a435..d86d92ac9 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -6,6 +6,7 @@ import (
"math"
"os"
"strings"
+ "syscall"
"time"
"github.com/seaweedfs/fuse"
@@ -127,28 +128,59 @@ func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.N
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
+ request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, req.Flags&fuse.OpenExclusive != 0)
+
+ if err != nil {
+ return nil, nil, err
+ }
+ var node fs.Node
+ if request.Entry.IsDirectory {
+ node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry)
+ return node, nil, nil
+ }
+
+ node = dir.newFile(req.Name, request.Entry)
+ file := node.(*File)
+ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
+ return file, fh, nil
+
+}
+
+func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
+
+ request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
+
+ if err != nil {
+ return nil, err
+ }
+ var node fs.Node
+ node = dir.newFile(req.Name, request.Entry)
+ return node, nil
+}
+
+func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exlusive bool) (*filer_pb.CreateEntryRequest, error) {
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: req.Mode&os.ModeDir > 0,
+ Name: name,
+ IsDirectory: mode&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
+ FileMode: uint32(mode &^ dir.wfs.option.Umask),
+ Uid: uid,
+ Gid: gid,
Collection: dir.wfs.option.Collection,
Replication: dir.wfs.option.Replication,
TtlSec: dir.wfs.option.TtlSec,
},
},
- OExcl: req.Flags&fuse.OpenExclusive != 0,
+ OExcl: exlusive,
Signatures: []int32{dir.wfs.signature},
}
- glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags)
+ glog.V(1).Infof("create %s/%s", dir.FullPath(), name)
- if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -157,41 +189,15 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
- glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), name, err)
return fuse.EIO
}
dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
- }); err != nil {
- return nil, nil, err
- }
- var node fs.Node
- if request.Entry.IsDirectory {
- node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry)
- return node, nil, nil
- }
-
- node = dir.newFile(req.Name, request.Entry)
- file := node.(*File)
- fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
- return file, fh, nil
-
-}
-
-func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
- if req.Mode&os.ModeNamedPipe != 0 {
- glog.V(1).Infof("mknod named pipe %s", req.String())
- return nil, fuse.ENOSYS
- }
- if req.Mode&req.Mode&os.ModeSocket != 0 {
- glog.V(1).Infof("mknod socket %s", req.String())
- return nil, fuse.ENOSYS
- }
- // not going to support mknod for normal files either
- glog.V(1).Infof("mknod %s", req.String())
- return nil, fuse.ENOSYS
+ })
+ return request, err
}
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
@@ -308,7 +314,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir}
ret = append(ret, dirent)
} else {
- dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
+ dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))}
ret = append(ret, dirent)
}
return nil
@@ -319,17 +325,37 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
- listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ processEachEntryFn(entry.ToProtoEntry(), false)
+ return true
+ })
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO
}
- for _, cachedEntry := range listedEntries {
- processEachEntryFn(cachedEntry.ToProtoEntry(), false)
- }
return
}
+func findFileType(mode uint16) fuse.DirentType {
+ switch mode & (syscall.S_IFMT & 0xffff) {
+ case syscall.S_IFSOCK:
+ return fuse.DT_Socket
+ case syscall.S_IFLNK:
+ return fuse.DT_Link
+ case syscall.S_IFREG:
+ return fuse.DT_File
+ case syscall.S_IFBLK:
+ return fuse.DT_Block
+ case syscall.S_IFDIR:
+ return fuse.DT_Dir
+ case syscall.S_IFCHR:
+ return fuse.DT_Char
+ case syscall.S_IFIFO:
+ return fuse.DT_FIFO
+ }
+ return fuse.DT_File
+}
+
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
if !req.Dir {
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index c273eec8a..d5f6cc38e 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -72,7 +72,7 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
}
totalRead, err := fh.readFromChunks(buff, req.Offset)
- if err == nil {
+ if err == nil || err == io.EOF {
maxStop := fh.readFromDirtyPages(buff, req.Offset)
totalRead = max(maxStop-req.Offset, totalRead)
}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 4b282253d..f4e4d7d6e 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -117,22 +117,22 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err
return mc.localStore.DeleteEntry(ctx, fp)
}
-func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
mc.RLock()
defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
- return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
+ return fmt.Errorf("unsynchronized dir: %v", dirPath)
}
- entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
mc.mapIdFromFilerToLocal(entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return err
}
- return entries, err
+ return err
}
func (mc *MetaCache) Shutdown() {
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 4089cea28..1ca3b16d5 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -19,6 +19,9 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
util.Retry("ReadDirAllEntries", func() error {
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(dirPath), pbEntry)
+ if IsHiddenSystemEntry(string(dirPath), entry.Name()) {
+ return nil
+ }
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
return err
@@ -38,3 +41,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
return
})
}
+
+func IsHiddenSystemEntry(dir, name string) bool {
+ return dir == "/" && name == "topics"
+}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index df4052096..3021473e5 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -101,20 +101,21 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return nil
}
- if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
- if err != io.EOF {
- // println("stopping from persisted logs", err.Error())
- return err
- }
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
-
// fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
for {
+
+ if err = broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ if err != io.EOF {
+ // println("stopping from persisted logs", err.Error())
+ return err
+ }
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
lastReadTime, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
@@ -122,6 +123,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return isConnected
}, eachLogEntryFn)
if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
diff --git a/weed/notification/configuration.go b/weed/notification/configuration.go
index 36211692c..541a453e9 100644
--- a/weed/notification/configuration.go
+++ b/weed/notification/configuration.go
@@ -4,7 +4,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
- "github.com/spf13/viper"
)
type MessageQueue interface {
@@ -21,7 +20,7 @@ var (
Queue MessageQueue
)
-func LoadConfiguration(config *viper.Viper, prefix string) {
+func LoadConfiguration(config *util.ViperProxy, prefix string) {
if config == nil {
return
@@ -43,7 +42,7 @@ func LoadConfiguration(config *viper.Viper, prefix string) {
}
-func validateOneEnabledQueue(config *viper.Viper) {
+func validateOneEnabledQueue(config *util.ViperProxy) {
enabledQueue := ""
for _, queue := range MessageQueues {
if config.GetBool(queue.GetName() + ".enabled") {
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 25843c892..e785b68a9 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -32,7 +32,7 @@ type FilePart struct {
type SubmitResult struct {
FileName string `json:"fileName,omitempty"`
- FileUrl string `json:"fileUrl,omitempty"`
+ FileUrl string `json:"url,omitempty"`
Fid string `json:"fid,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
@@ -69,6 +69,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
+ file.Ttl = ttl
results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index a58c8f296..9c0e4176f 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -3,6 +3,7 @@ package filersink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@@ -206,7 +207,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
})
}
-func compareChunks(lookupFileIdFn filer.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
+func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
if aErr != nil {
return nil, nil, aErr
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index c305fee6f..b8af6381a 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -156,7 +156,36 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
// check whether the request has valid access keys
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) (*Identity, s3err.ErrorCode) {
- identity, s3Err := iam.authUser(r)
+ var identity *Identity
+ var s3Err s3err.ErrorCode
+ var found bool
+ switch getRequestAuthType(r) {
+ case authTypeStreamingSigned:
+ return identity, s3err.ErrNone
+ case authTypeUnknown:
+ glog.V(3).Infof("unknown auth type")
+ return identity, s3err.ErrAccessDenied
+ case authTypePresignedV2, authTypeSignedV2:
+ glog.V(3).Infof("v2 auth type")
+ identity, s3Err = iam.isReqAuthenticatedV2(r)
+ case authTypeSigned, authTypePresigned:
+ glog.V(3).Infof("v4 auth type")
+ identity, s3Err = iam.reqSignatureV4Verify(r)
+ case authTypePostPolicy:
+ glog.V(3).Infof("post policy auth type")
+ return identity, s3err.ErrNone
+ case authTypeJWT:
+ glog.V(3).Infof("jwt auth type")
+ return identity, s3err.ErrNotImplemented
+ case authTypeAnonymous:
+ identity, found = iam.lookupAnonymous()
+ if !found {
+ return identity, s3err.ErrAccessDenied
+ }
+ default:
+ return identity, s3err.ErrNotImplemented
+ }
+
if s3Err != s3err.ErrNone {
return identity, s3Err
}
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index f750f6e53..338f82668 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -51,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
- if identity!=nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
+ if identity != nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
continue
}
buckets = append(buckets, &s3.Bucket{
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index c1c6e2f89..a2407fced 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -71,7 +71,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
ContinuationToken: continuationToken,
Delimiter: response.Delimiter,
IsTruncated: response.IsTruncated,
- KeyCount: len(response.Contents),
+ KeyCount: len(response.Contents) + len(response.CommonPrefixes),
MaxKeys: response.MaxKeys,
NextContinuationToken: response.NextMarker,
Prefix: response.Prefix,
@@ -238,7 +238,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
}
}
- if counter >= maxKeys {
+ if counter >= maxKeys + 1 {
isTruncated = true
return
}
@@ -264,8 +264,10 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
}
} else {
var isEmpty bool
- if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
- glog.Errorf("check empty folder %s: %v", dir, err)
+ if !s3a.option.AllowEmptyFolder {
+ if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
+ glog.Errorf("check empty folder %s: %v", dir, err)
+ }
}
if !isEmpty {
eachEntryFn(dir, entry)
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index e4a07a443..4993104ae 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -20,6 +20,7 @@ type S3ApiServerOption struct {
DomainName string
BucketsPath string
GrpcDialOption grpc.DialOption
+ AllowEmptyFolder bool
}
type S3ApiServer struct {
diff --git a/weed/security/tls.go b/weed/security/tls.go
index 72363768f..b4bf84e2d 100644
--- a/weed/security/tls.go
+++ b/weed/security/tls.go
@@ -3,17 +3,16 @@ package security
import (
"crypto/tls"
"crypto/x509"
+ "github.com/chrislusf/seaweedfs/weed/util"
"io/ioutil"
- "github.com/spf13/viper"
-
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/chrislusf/seaweedfs/weed/glog"
)
-func LoadServerTLS(config *viper.Viper, component string) grpc.ServerOption {
+func LoadServerTLS(config *util.ViperProxy, component string) grpc.ServerOption {
if config == nil {
return nil
}
@@ -40,7 +39,7 @@ func LoadServerTLS(config *viper.Viper, component string) grpc.ServerOption {
return grpc.Creds(ta)
}
-func LoadClientTLS(config *viper.Viper, component string) grpc.DialOption {
+func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
if config == nil {
return grpc.WithInsecure()
}
diff --git a/weed/server/common.go b/weed/server/common.go
index 58079032e..cf9547950 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -233,12 +233,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
-func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) {
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(w, 0, totalSize); err != nil {
+ if err := writeFn(w, 0, totalSize, 0); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -277,9 +277,8 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
- err = writeFn(w, ra.start, ra.length)
+ err = writeFn(w, ra.start, ra.length, http.StatusPartialContent)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -307,7 +306,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
pw.CloseWithError(e)
return
}
- if e = writeFn(part, ra.start, ra.length); e != nil {
+ if e = writeFn(part, ra.start, ra.length, 0); e != nil {
pw.CloseWithError(e)
return
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5f1b2d819..b0563d8bd 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -44,7 +44,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
}, nil
}
-func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
glog.V(4).Infof("ListEntries %v", req)
@@ -60,23 +60,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
+ var listErr error
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
-
- if err != nil {
- return err
- }
- if len(entries) == 0 {
- return nil
- }
-
- includeLastFile = false
-
- for _, entry := range entries {
-
- lastFileName = entry.Name()
-
- if err := stream.Send(&filer_pb.ListEntriesResponse{
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
Name: entry.Name(),
IsDirectory: entry.IsDirectory(),
@@ -88,19 +77,28 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
Content: entry.Content,
},
}); err != nil {
- return err
+ return false
}
limit--
if limit == 0 {
- return nil
+ return false
}
- }
+ return true
+ })
- if len(entries) < paginationLimit {
- break
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
+ }
+ if !hasEntries {
+ return nil
}
+ includeLastFile = false
+
}
return nil
@@ -326,7 +324,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
- if err != nil {
+ if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
}
return resp, nil
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index fa86737ac..5b68b64de 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -75,7 +75,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "")
+ entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "")
if err != nil {
return err
}
@@ -90,7 +90,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return err
}
}
- if len(entries) < 1024 {
+ if !hasMore {
break
}
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 3b8ced675..d9f91b125 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -29,16 +29,20 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
+ var processedTsNs int64
+ var err error
for {
+
+ processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
@@ -46,6 +50,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return true
}, eachLogEntryFn)
if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
@@ -73,19 +80,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- // println("reading from persisted logs ...")
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
- glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ var processedTsNs int64
+ var err error
- // println("reading from in memory logs ...")
for {
+ // println("reading from persisted logs ...")
+ processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+ // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ // println("reading from in memory logs ...")
+
lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
@@ -93,6 +104,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
return true
}, eachLogEntryFn)
if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2da129ab2..22474a5e2 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -26,9 +26,12 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -56,7 +59,7 @@ type FilerOption struct {
Port uint32
recursiveDelete bool
Cipher bool
- CacheToFilerLimit int64
+ SaveToFilerLimit int
Filers []string
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index d55bf7cbb..9fdc03dea 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -6,6 +6,7 @@ import (
"io"
"mime"
"net/http"
+ "net/url"
"path/filepath"
"strconv"
"strings"
@@ -99,6 +100,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.Header().Set(k, string(v))
}
+ //Seaweed custom header are not visible to Vue or javascript
+ seaweedHeaders := []string{}
+ for header, _ := range w.Header() {
+ if strings.HasPrefix(header, "Seaweed-") {
+ seaweedHeaders = append(seaweedHeaders, header)
+ }
+ }
+ seaweedHeaders = append(seaweedHeaders, "Content-Disposition")
+ w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ","))
+
//set tag count
if r.Method == "GET" {
tagCount := 0
@@ -121,6 +132,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
setEtag(w, etag)
filename := entry.Name()
+ filename = url.QueryEscape(filename)
adjustHeaderContentDisposition(w, r, filename)
totalSize := int64(entry.Size())
@@ -146,8 +158,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
if offset+size <= int64(len(entry.Content)) {
+ if httpStatusCode != 0 {
+ w.WriteHeader(httpStatusCode)
+ }
_, err := writer.Write(entry.Content[offset : offset+size])
return err
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index f303ba1d4..9cf79ab41 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -36,7 +36,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
namePattern := r.FormValue("namePattern")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern)
+ entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
@@ -44,7 +44,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
return
}
- shouldDisplayLoadMore := len(entries) == limit
if path == "/" {
path = ""
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index eee39152b..237f121fe 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -96,12 +96,6 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return nil, nil, err
}
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
-
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
@@ -111,25 +105,26 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := ""
- contentType := ""
+ contentType := r.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
if err != nil {
return nil, nil, err
}
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
-
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
return
}
+func isAppend(r *http.Request) bool {
+ return r.URL.Query().Get("op") == "append"
+}
+
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -151,26 +146,62 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
- glog.V(4).Infoln("saving", path)
- entry := &filer.Entry{
- FullPath: util.FullPath(path),
- Attr: filer.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: so.Replication,
- Collection: so.Collection,
- TtlSec: so.TtlSeconds,
- Mime: contentType,
- Md5: md5bytes,
- FileSize: uint64(chunkOffset),
- },
- Chunks: fileChunks,
- Content: content,
+ var entry *filer.Entry
+ var mergedChunks []*filer_pb.FileChunk
+ // when it is an append
+ if isAppend(r) {
+ existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if findErr != nil && findErr != filer_pb.ErrNotFound {
+ glog.V(0).Infof("failing to find %s: %v", path, findErr)
+ }
+ entry = existingEntry
+ }
+ if entry != nil {
+ entry.Mtime = time.Now()
+ entry.Md5 = nil
+ // adjust chunk offsets
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ mergedChunks = append(entry.Chunks, fileChunks...)
+ entry.FileSize += uint64(chunkOffset)
+
+ // TODO
+ if len(entry.Content) > 0 {
+ replyerr = fmt.Errorf("append to small file is not supported yet")
+ return
+ }
+
+ } else {
+ glog.V(4).Infoln("saving", path)
+ mergedChunks = fileChunks
+ entry = &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: so.Replication,
+ Collection: so.Collection,
+ TtlSec: so.TtlSeconds,
+ Mime: contentType,
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
+ },
+ Content: content,
+ }
}
+ // maybe compact entry chunks
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
+ entry.Chunks = mergedChunks
+
filerResult = &FilerPostResult{
Name: fileName,
Size: chunkOffset,
@@ -189,7 +220,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
+ fs.filer.DeleteChunks(fileChunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -204,7 +235,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
chunkOffset := int64(0)
- var smallContent, content []byte
+ var smallContent []byte
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
@@ -213,6 +244,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
if err != nil {
return nil, nil, 0, err, nil
}
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
+ }
+ }
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@@ -239,8 +277,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return nil, nil, 0, uploadErr, nil
}
- content = data
-
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
break
@@ -260,9 +296,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}
}
- if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 {
- smallContent = content
- }
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go
new file mode 100644
index 000000000..5fcc7e88f
--- /dev/null
+++ b/weed/server/filer_server_rocksdb.go
@@ -0,0 +1,7 @@
+// +build rocksdb
+
+package weed_server
+
+import (
+ _ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb"
+)
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 1c963b39c..1cd4ee21d 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -261,10 +261,13 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
}
+ if httpStatusCode != 0 {
+ w.WriteHeader(httpStatusCode)
+ }
_, e = io.CopyN(writer, rs, size)
return e
})
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index 869949a25..ca51ef72f 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -25,7 +25,7 @@ func (c *commandS3Configure) Name() string {
}
func (c *commandS3Configure) Help() string {
- return `<WIP> configure and apply s3 options for each bucket
+ return `configure and apply s3 options for each bucket
# see the current configuration file content
s3.configure
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index daab29621..b8b883be6 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -1,6 +1,7 @@
package backend
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"io"
"os"
"strings"
@@ -9,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/spf13/viper"
)
type BackendStorageFile interface {
@@ -45,7 +45,7 @@ var (
)
// used by master to load remote storage configurations
-func LoadConfiguration(config *viper.Viper) {
+func LoadConfiguration(config *util.ViperProxy) {
StorageBackendPrefix := "storage.backend"
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
index 8f457be1d..7201503f1 100644
--- a/weed/storage/needle/needle_parse_upload.go
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -193,9 +193,9 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
mtype = contentType
}
- pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
- // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd"
}
+ pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
+ // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd"
return
}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 0a4df63d0..eaed51654 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -151,7 +151,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro
}
func (dn *DataNode) GetDataCenter() *DataCenter {
- return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
+ rack := dn.Parent()
+ dcNode := rack.Parent()
+ dcValue := dcNode.GetValue()
+ return dcValue.(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index 931e45e9a..1eb00e1fa 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -3,7 +3,7 @@ package chunk_cache
import (
"time"
- "github.com/karlseguin/ccache"
+ "github.com/karlseguin/ccache/v2"
)
// a global cache for recently accessed file chunks
diff --git a/weed/util/config.go b/weed/util/config.go
index 94e621e34..ee805f26a 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -2,6 +2,7 @@ package util
import (
"strings"
+ "sync"
"github.com/spf13/viper"
@@ -28,11 +29,11 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- logLevel := glog.Level(0)
if strings.Contains(err.Error(), "Not Found") {
- logLevel = 1
+ glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
+ } else {
+ glog.Fatalf("Reading %s: %v", viper.ConfigFileUsed(), err)
}
- glog.V(logLevel).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
if required {
glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
"\n\nPlease use this command to generate the default %s.toml file\n"+
@@ -46,11 +47,55 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
return true
}
-func GetViper() *viper.Viper {
- v := &viper.Viper{}
- *v = *viper.GetViper()
- v.AutomaticEnv()
- v.SetEnvPrefix("weed")
- v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
- return v
+type ViperProxy struct {
+ *viper.Viper
+ sync.Mutex
+}
+
+var (
+ vp = &ViperProxy{}
+)
+
+func (vp *ViperProxy) SetDefault(key string, value interface{}) {
+ vp.Lock()
+ defer vp.Unlock()
+ vp.Viper.SetDefault(key, value)
+}
+
+func (vp *ViperProxy) GetString(key string) string {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetString(key)
+}
+
+func (vp *ViperProxy) GetBool(key string) bool {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetBool(key)
+}
+
+func (vp *ViperProxy) GetInt(key string) int {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetInt(key)
+}
+
+func (vp *ViperProxy) GetStringSlice(key string) []string {
+ vp.Lock()
+ defer vp.Unlock()
+ return vp.Viper.GetStringSlice(key)
+}
+
+func GetViper() *ViperProxy {
+ vp.Lock()
+ defer vp.Unlock()
+
+ if vp.Viper == nil {
+ vp.Viper = viper.GetViper()
+ vp.AutomaticEnv()
+ vp.SetEnvPrefix("weed")
+ vp.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ }
+
+ return vp
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 95370746b..4e6a28334 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 17)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 21)
COMMIT = ""
)
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index e4310b5c5..f84c674ff 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -28,6 +28,7 @@ type LogBuffer struct {
pos int
startTime time.Time
stopTime time.Time
+ lastFlushTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn func(startTime, stopTime time.Time, buf []byte)
@@ -129,6 +130,7 @@ func (m *LogBuffer) loopFlush() {
// fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes()))
m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
d.releaseMemory()
+ m.lastFlushTime = d.stopTime
}
}
}
@@ -174,10 +176,14 @@ func (d *dataToFlush) releaseMemory() {
bufferPool.Put(d.data)
}
-func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) {
m.RLock()
defer m.RUnlock()
+ if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
+ return nil, ResumeFromDiskError
+ }
+
/*
fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
for i, prevBuf := range m.prevBuffers.buffers {
@@ -186,11 +192,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
*/
if lastReadTime.Equal(m.stopTime) {
- return nil
+ return nil, nil
}
if lastReadTime.After(m.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
- return nil
+ return nil, nil
}
if lastReadTime.Before(m.startTime) {
// println("checking ", lastReadTime.UnixNano())
@@ -198,19 +204,19 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
if buf.startTime.After(lastReadTime) {
if i == 0 {
// println("return the earliest in memory", buf.startTime.UnixNano())
- return copiedBytes(buf.buf[:buf.size])
+ return copiedBytes(buf.buf[:buf.size]), nil
}
// println("return the", i, "th in memory", buf.startTime.UnixNano())
- return copiedBytes(buf.buf[:buf.size])
+ return copiedBytes(buf.buf[:buf.size]), nil
}
if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
pos := buf.locateByTs(lastReadTime)
// fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
- return copiedBytes(buf.buf[pos:buf.size])
+ return copiedBytes(buf.buf[pos:buf.size]), nil
}
}
// println("return the current buf", lastReadTime.UnixNano())
- return copiedBytes(m.buf[:m.pos])
+ return copiedBytes(m.buf[:m.pos]), nil
}
lastTs := lastReadTime.UnixNano()
@@ -243,7 +249,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
}
if prevT <= lastTs {
// fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
- return copiedBytes(m.buf[pos:m.pos])
+ return copiedBytes(m.buf[pos:m.pos]), nil
}
h = mid
}
@@ -251,7 +257,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
}
// FIXME: this could be that the buffer has been flushed already
- return nil
+ return nil, nil
}
func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 57f4b0115..d6917abfe 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -13,7 +13,8 @@ import (
)
var (
- ResumeError = fmt.Errorf("resume")
+ ResumeError = fmt.Errorf("resume")
+ ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)
func (logBuffer *LogBuffer) LoopProcessLogData(
@@ -34,7 +35,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
- bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
+ if err == ResumeFromDiskError {
+ return lastReadTime, ResumeFromDiskError
+ }
// fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
if bytesBuf == nil {
if waitForDataFn() {
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 773da0191..271baa132 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -15,6 +15,12 @@ const (
maxCursorIndex = 4096
)
+type HasLookupFileIdFunction interface {
+ GetLookupFileIdFunction() LookupFileIdFunctionType
+}
+
+type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
+
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
@@ -67,6 +73,10 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return
}
+func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return vc.LookupFileId
+}
+
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {