aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-12-22 17:44:52 -0800
committerChris Lu <chris.lu@gmail.com>2020-12-22 17:44:52 -0800
commit1c7e1295dc4c91ab55920af9082498b55bdc3e66 (patch)
tree7c91e9c32958e339d8bd36b05382c1ab5d0e0ccb
parent2260864b458d33c03376eb789af8053712ae31c8 (diff)
parentda134a2eb74c12dec110db20cc882c768f2f41c2 (diff)
downloadseaweedfs-1c7e1295dc4c91ab55920af9082498b55bdc3e66.tar.xz
seaweedfs-1c7e1295dc4c91ab55920af9082498b55bdc3e66.zip
Merge branch 'master' into support_ssd_volume
-rw-r--r--.github/workflows/release.yml10
-rw-r--r--docker/Dockerfile1
-rw-r--r--docker/Makefile3
-rw-r--r--docker/local-k8s-compose.yml65
-rw-r--r--docker/seaweedfs.sql12
-rw-r--r--k8s/seaweedfs/values.yaml2
-rw-r--r--weed/Makefile2
-rw-r--r--weed/command/filer.go9
-rw-r--r--weed/command/scaffold.go28
-rw-r--r--weed/filer/cassandra/cassandra_store.go44
-rw-r--r--weed/filer/configuration.go63
-rw-r--r--weed/filer/filerstore.go253
-rw-r--r--weed/filer/filerstore_hardlink.go3
-rw-r--r--weed/filer/filerstore_translate_path.go161
-rw-r--r--weed/filer/filerstore_wrapper.go299
-rw-r--r--weed/filer/redis2/redis_cluster_store.go4
-rw-r--r--weed/filer/redis2/redis_store.go4
-rw-r--r--weed/filer/redis2/universal_redis_store.go27
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/shell/command_fs_configure.go2
-rw-r--r--weed/shell/command_s3_bucket_create.go (renamed from weed/shell/command_bucket_create.go)16
-rw-r--r--weed/shell/command_s3_bucket_delete.go (renamed from weed/shell/command_bucket_delete.go)14
-rw-r--r--weed/shell/command_s3_bucket_list.go (renamed from weed/shell/command_bucket_list.go)12
-rw-r--r--weed/util/constants.go2
24 files changed, 738 insertions, 300 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 70c14487c..2e1e9c162 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -12,8 +12,13 @@ jobs:
strategy:
matrix:
goos: [linux, windows, darwin, freebsd, netbsd, openbsd ]
- goarch: ["386", amd64]
- # goarch: ["386", amd64, arm]
+ goarch: ["386", amd64, arm]
+ exclude:
+ - goarch: arm
+ goos: darwin
+ - goarch: 386
+ goos: freebsd
+
steps:
- name: Check out code into the Go module directory
@@ -26,7 +31,6 @@ jobs:
tag: dev
fail-if-no-assets: false
assets: |
- weed-large-disk-*
weed-*
- name: Set BUILD_TIME env
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 7146b91c7..be7414d0b 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -12,6 +12,7 @@ RUN \
SUPERCRONIC=supercronic-linux-$ARCH && \
# Install SeaweedFS and Supercronic ( for cron job mode )
apk add --no-cache --virtual build-dependencies --update wget curl ca-certificates && \
+ apk add fuse && \
wget -P /tmp https://github.com/$(curl -s -L https://github.com/chrislusf/seaweedfs/releases/latest | egrep -o "chrislusf/seaweedfs/releases/download/.*/linux_$ARCH.tar.gz") && \
tar -C /usr/bin/ -xzvf /tmp/linux_$ARCH.tar.gz && \
curl -fsSLO "$SUPERCRONIC_URL" && \
diff --git a/docker/Makefile b/docker/Makefile
index 8ab83ca18..c2e9a12e7 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -12,6 +12,9 @@ build:
dev: build
docker-compose -f local-dev-compose.yml -p seaweedfs up
+k8s: build
+ docker-compose -f local-k8s-compose.yml -p seaweedfs up
+
dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up
diff --git a/docker/local-k8s-compose.yml b/docker/local-k8s-compose.yml
new file mode 100644
index 000000000..0dda89ca4
--- /dev/null
+++ b/docker/local-k8s-compose.yml
@@ -0,0 +1,65 @@
+version: '2'
+
+services:
+ master:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9333:9333
+ - 19333:19333
+ command: "master -ip=master"
+ volume:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8080:8080
+ - 18080:18080
+ command: "volume -mserver=master:9333 -port=8080 -ip=volume"
+ depends_on:
+ - master
+ mysql:
+ image: percona/percona-server:5.7
+ ports:
+ - 3306:3306
+ volumes:
+ - ./seaweedfs.sql:/docker-entrypoint-initdb.d/seaweedfs.sql
+ environment:
+ - MYSQL_ROOT_PASSWORD=secret
+ - MYSQL_DATABASE=seaweedfs
+ - MYSQL_PASSWORD=secret
+ - MYSQL_USER=seaweedfs
+ filer:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8888:8888
+ - 18888:18888
+ environment:
+ - WEED_MYSQL_HOSTNAME=mysql
+ - WEED_MYSQL_PORT=3306
+ - WEED_MYSQL_DATABASE=seaweedfs
+ - WEED_MYSQL_USERNAME=seaweedfs
+ - WEED_MYSQL_PASSWORD=secret
+ - WEED_MYSQL_ENABLED=true
+ - WEED_LEVELDB2_ENABLED=false
+ command: 'filer -master="master:9333"'
+ depends_on:
+ - master
+ - volume
+ - mysql
+ ingress:
+ image: jwilder/nginx-proxy
+ ports:
+ - "80:80"
+ volumes:
+ - /var/run/docker.sock:/tmp/docker.sock:ro
+ - /tmp/nginx:/etc/nginx/conf.d
+ s3:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8333:8333
+ command: 's3 -filer="filer:8888"'
+ depends_on:
+ - master
+ - volume
+ - filer
+ environment:
+ - VIRTUAL_HOST=s3
+ - VIRTUAL_PORT=8333 \ No newline at end of file
diff --git a/docker/seaweedfs.sql b/docker/seaweedfs.sql
new file mode 100644
index 000000000..38ebc575c
--- /dev/null
+++ b/docker/seaweedfs.sql
@@ -0,0 +1,12 @@
+CREATE DATABASE IF NOT EXISTS seaweedfs;
+CREATE USER IF NOT EXISTS 'seaweedfs'@'%' IDENTIFIED BY 'secret';
+GRANT ALL PRIVILEGES ON seaweedfs_fast.* TO 'seaweedfs'@'%';
+FLUSH PRIVILEGES;
+USE seaweedfs;
+CREATE TABLE IF NOT EXISTS filemeta (
+ dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
+ name VARCHAR(1000) COMMENT 'directory or file name',
+ directory TEXT COMMENT 'full path to parent directory',
+ meta LONGBLOB,
+ PRIMARY KEY (dirhash, name)
+) DEFAULT CHARSET=utf8; \ No newline at end of file
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 72bdb684a..7667d2815 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- imageTag: "2.15"
+ imageTag: "2.16"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
diff --git a/weed/Makefile b/weed/Makefile
index 12b8e8173..fd0843c22 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -20,7 +20,7 @@ debug_mount:
debug_server:
go build -gcflags="all=-N -l"
- dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=/Volumes/mobile_disk/99 -filer -volume.port=8343 -s3 -volume.max=0
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- server -dir=/Volumes/mobile_disk/99 -filer -volume.port=8343 -s3 -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1
debug_volume:
go build -gcflags="all=-N -l"
diff --git a/weed/command/filer.go b/weed/command/filer.go
index e72056893..a3008eb29 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -43,8 +43,6 @@ type FilerOptions struct {
peers *string
metricsHttpPort *int
cacheToFilerLimit *int
-
- // default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
}
@@ -67,6 +65,7 @@ func init() {
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -92,6 +91,7 @@ var cmdFiler = &Command{
GET /path/to/
The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
+ If the "filer.toml" is not found, an embedded filer store will be craeted under "-defaultStoreDir".
The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
@@ -127,10 +127,7 @@ func (fo *FilerOptions) startFiler() {
publicVolumeMux = http.NewServeMux()
}
- defaultLevelDbDirectory := "./filerldb2"
- if fo.defaultLevelDbDirectory != nil {
- defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
- }
+ defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
var peers []string
if *fo.peers != "" {
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 82410f6d9..04a988027 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -138,12 +138,16 @@ hosts=[
]
username=""
password=""
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[redis2]
enabled = false
address = "localhost:6379"
password = ""
database = 0
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[redis_cluster2]
enabled = false
@@ -160,6 +164,8 @@ password = ""
readOnly = true
# automatically use the closest Redis server for reads
routeByLatency = true
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[etcd]
enabled = false
@@ -185,6 +191,28 @@ sniff_enabled = false
healthcheck_enabled = false
# increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000
+
+
+
+##########################
+##########################
+# To add path-specific filer store:
+#
+# 1. Add a name following the store type separated by a dot ".". E.g., cassandra.tmp
+# 2. Add a location configuraiton. E.g., location = "/tmp/"
+# 3. Copy and customize all other configurations.
+# Make sure they are not the same if using the same store type!
+# 4. Set enabled to true
+#
+# The following is just using cassandra as an example
+##########################
+[redis2.tmp]
+enabled = false
+location = "/tmp/"
+address = "localhost:6379"
+password = ""
+database = 1
+
`
NOTIFICATION_TOML_EXAMPLE = `
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index ae8cb7a86..49f5625d9 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -16,8 +16,9 @@ func init() {
}
type CassandraStore struct {
- cluster *gocql.ClusterConfig
- session *gocql.Session
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+ superLargeDirectoryHash map[string]string
}
func (store *CassandraStore) GetName() string {
@@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) {
+func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
+ dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
@@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam
if err != nil {
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
}
+
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]string)
+ existingHash := make(map[string]string)
+ for _, dir := range superLargeDirectories {
+ // adding dir hash to avoid duplicated names
+ dirHash := util.Md5String([]byte(dir))[:4]
+ store.superLargeDirectoryHash[dir] = dirHash
+ if existingDir, found := existingHash[dirHash]; found {
+ glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
+ }
+ existingHash[dirHash] = dir
+ }
return
}
@@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
var data []byte
if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
@@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=? AND name=?",
@@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full
}
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?",
@@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
+
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
if inclusive {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go
index 3dce67d6d..a6f18709e 100644
--- a/weed/filer/configuration.go
+++ b/weed/filer/configuration.go
@@ -1,10 +1,11 @@
package filer
import (
- "os"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
+ "os"
+ "reflect"
+ "strings"
)
var (
@@ -15,25 +16,67 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
validateOneEnabledStore(config)
+ // load configuration for default filer store
+ hasDefaultStoreConfigured := false
for _, store := range Stores {
if config.GetBool(store.GetName() + ".enabled") {
+ store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
if err := store.Initialize(config, store.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize store for %s: %+v",
- store.GetName(), err)
+ glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
}
f.SetStore(store)
- glog.V(0).Infof("Configure filer for %s", store.GetName())
- return
+ glog.V(0).Infof("configured filer store to %s", store.GetName())
+ hasDefaultStoreConfigured = true
+ break
+ }
+ }
+
+ if !hasDefaultStoreConfigured {
+ println()
+ println("Supported filer stores are:")
+ for _, store := range Stores {
+ println(" " + store.GetName())
}
+ os.Exit(-1)
}
- println()
- println("Supported filer stores are:")
+ // load path-specific filer store here
+ // f.Store.AddPathSpecificStore(path, store)
+ storeNames := make(map[string]FilerStore)
for _, store := range Stores {
- println(" " + store.GetName())
+ storeNames[store.GetName()] = store
+ }
+ allKeys := config.AllKeys()
+ for _, key := range allKeys {
+ if !strings.HasSuffix(key, ".enabled") {
+ continue
+ }
+ key = key[:len(key)-len(".enabled")]
+ if !strings.Contains(key, ".") {
+ continue
+ }
+
+ parts := strings.Split(key, ".")
+ storeName, storeId := parts[0], parts[1]
+
+ store, found := storeNames[storeName]
+ if !found {
+ continue
+ }
+ store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
+ if err := store.Initialize(config, key+"."); err != nil {
+ glog.Fatalf("Failed to initialize store for %s: %+v", key, err)
+ }
+ location := config.GetString(key + ".location")
+ if location == "" {
+ glog.Errorf("path-specific filer store needs %s", key+".location")
+ os.Exit(-1)
+ }
+ f.Store.AddPathSpecificStore(location, storeId, store)
+
+ glog.V(0).Infof("configure filer %s for %s", store.GetName(), location)
}
- os.Exit(-1)
}
func validateOneEnabledStore(config *viper.Viper) {
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 4b28c3021..f1e6c6c35 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -3,19 +3,14 @@ package filer
import (
"context"
"errors"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
- ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
- ErrKvNotImplemented = errors.New("kv not implemented yet")
- ErrKvNotFound = errors.New("kv: not found")
+ ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
+ ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
+ ErrKvNotImplemented = errors.New("kv not implemented yet")
+ ErrKvNotFound = errors.New("kv: not found")
)
type FilerStore interface {
@@ -42,243 +37,3 @@ type FilerStore interface {
Shutdown()
}
-
-type VirtualFilerStore interface {
- FilerStore
- DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
- DeleteOneEntry(ctx context.Context, entry *Entry) error
-}
-
-type FilerStoreWrapper struct {
- ActualStore FilerStore
-}
-
-func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
- if innerStore, ok := store.(*FilerStoreWrapper); ok {
- return innerStore
- }
- return &FilerStoreWrapper{
- ActualStore: store,
- }
-}
-
-func (fsw *FilerStoreWrapper) GetName() string {
- return fsw.ActualStore.GetName()
-}
-
-func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
- return fsw.ActualStore.Initialize(configuration, prefix)
-}
-
-func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- if entry.Mime == "application/octet-stream" {
- entry.Mime = ""
- }
-
- if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
- return err
- }
-
- glog.V(4).Infof("InsertEntry %s", entry.FullPath)
- return fsw.ActualStore.InsertEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- if entry.Mime == "application/octet-stream" {
- entry.Mime = ""
- }
-
- if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
- return err
- }
-
- glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
- return fsw.ActualStore.UpdateEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
- }()
-
- glog.V(4).Infof("FindEntry %s", fp)
- entry, err = fsw.ActualStore.FindEntry(ctx, fp)
- if err != nil {
- return nil, err
- }
-
- fsw.maybeReadHardLink(ctx, entry)
-
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- return
-}
-
-func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
- }()
-
- existingEntry, findErr := fsw.FindEntry(ctx, fp)
- if findErr == filer_pb.ErrNotFound {
- return nil
- }
- if len(existingEntry.HardLinkId) != 0 {
- // remove hard link
- glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
- if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
- return err
- }
- }
-
- glog.V(4).Infof("DeleteEntry %s", fp)
- return fsw.ActualStore.DeleteEntry(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
- }()
-
- if len(existingEntry.HardLinkId) != 0 {
- // remove hard link
- glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
- if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
- return err
- }
- }
-
- glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
- return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
-}
-
-func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
- }()
-
- glog.V(4).Infof("DeleteFolderChildren %s", fp)
- return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
- }()
-
- glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
- entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, err
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
- stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
- }()
- glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- if err == ErrUnsupportedListDirectoryPrefixed {
- entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- }
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, nil
-}
-
-func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
- entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
-
- if prefix == "" {
- return
- }
-
- count := 0
- var lastFileName string
- notPrefixed := entries
- entries = nil
- for count < limit && len(notPrefixed) > 0 {
- for _, entry := range notPrefixed {
- lastFileName = entry.Name()
- if strings.HasPrefix(entry.Name(), prefix) {
- count++
- entries = append(entries, entry)
- if count >= limit {
- break
- }
- }
- }
- if count < limit {
- notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
- return fsw.ActualStore.BeginTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
- return fsw.ActualStore.CommitTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
- return fsw.ActualStore.RollbackTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) Shutdown() {
- fsw.ActualStore.Shutdown()
-}
-
-func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- return fsw.ActualStore.KvPut(ctx, key, value)
-}
-func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- return fsw.ActualStore.KvGet(ctx, key)
-}
-func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
- return fsw.ActualStore.KvDelete(ctx, key)
-}
diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go
index c6b3734b0..316c76a0c 100644
--- a/weed/filer/filerstore_hardlink.go
+++ b/weed/filer/filerstore_hardlink.go
@@ -19,7 +19,8 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
// check what is existing entry
glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
- existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
+ actualStore := fsw.getActualStore(entry.FullPath)
+ existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound {
return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
}
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
new file mode 100644
index 000000000..ea0f9db77
--- /dev/null
+++ b/weed/filer/filerstore_translate_path.go
@@ -0,0 +1,161 @@
+package filer
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
+)
+
+var (
+ _ = FilerStore(&FilerStorePathTranlator{})
+)
+
+type FilerStorePathTranlator struct {
+ actualStore FilerStore
+ storeRoot string
+}
+
+func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator {
+ if innerStore, ok := store.(*FilerStorePathTranlator); ok {
+ return innerStore
+ }
+
+ if !strings.HasSuffix(storeRoot, "/") {
+ storeRoot += "/"
+ }
+
+ return &FilerStorePathTranlator{
+ actualStore: store,
+ storeRoot: storeRoot,
+ }
+}
+
+func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) {
+ newPath = fp
+ if t.storeRoot == "/" {
+ return
+ }
+ newPath = fp[len(t.storeRoot)-1:]
+ if newPath == "" {
+ newPath = "/"
+ }
+ return
+}
+func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) {
+ previousPath = entry.FullPath
+ if t.storeRoot == "/" {
+ return
+ }
+ entry.FullPath = t.translatePath(previousPath)
+ return
+}
+func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) {
+ entry.FullPath = previousPath
+}
+
+func (t *FilerStorePathTranlator) GetName() string {
+ return t.actualStore.GetName()
+}
+
+func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error {
+ return t.actualStore.Initialize(configuration, prefix)
+}
+
+func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error {
+ previousPath := t.changeEntryPath(entry)
+ defer t.recoverEntryPath(entry, previousPath)
+
+ return t.actualStore.InsertEntry(ctx, entry)
+}
+
+func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error {
+ previousPath := t.changeEntryPath(entry)
+ defer t.recoverEntryPath(entry, previousPath)
+
+ return t.actualStore.UpdateEntry(ctx, entry)
+}
+
+func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
+ if t.storeRoot == "/" {
+ return t.actualStore.FindEntry(ctx, fp)
+ }
+ newFullPath := t.translatePath(fp)
+ entry, err = t.actualStore.FindEntry(ctx, newFullPath)
+ if err == nil {
+ entry.FullPath = fp[:len(t.storeRoot)-1] + entry.FullPath
+ }
+ return
+}
+
+func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ newFullPath := t.translatePath(fp)
+ return t.actualStore.DeleteEntry(ctx, newFullPath)
+}
+
+func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
+
+ previousPath := t.changeEntryPath(existingEntry)
+ defer t.recoverEntryPath(existingEntry, previousPath)
+
+ return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath)
+}
+
+func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ newFullPath := t.translatePath(fp)
+
+ return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
+}
+
+func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+
+ newFullPath := t.translatePath(dirPath)
+
+ entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
+ }
+ return entries, err
+}
+
+func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+
+ newFullPath := t.translatePath(dirPath)
+
+ entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
+ }
+ return entries, nil
+}
+
+func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return t.actualStore.BeginTransaction(ctx)
+}
+
+func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error {
+ return t.actualStore.CommitTransaction(ctx)
+}
+
+func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error {
+ return t.actualStore.RollbackTransaction(ctx)
+}
+
+func (t *FilerStorePathTranlator) Shutdown() {
+ t.actualStore.Shutdown()
+}
+
+func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ return t.actualStore.KvPut(ctx, key, value)
+}
+func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ return t.actualStore.KvGet(ctx, key)
+}
+func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) {
+ return t.actualStore.KvDelete(ctx, key)
+}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
new file mode 100644
index 000000000..3206d5ba4
--- /dev/null
+++ b/weed/filer/filerstore_wrapper.go
@@ -0,0 +1,299 @@
+package filer
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/viant/ptrie"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ _ = VirtualFilerStore(&FilerStoreWrapper{})
+)
+
+type VirtualFilerStore interface {
+ FilerStore
+ DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
+ DeleteOneEntry(ctx context.Context, entry *Entry) error
+ AddPathSpecificStore(path string, storeId string, store FilerStore)
+}
+
+type FilerStoreWrapper struct {
+ defaultStore FilerStore
+ pathToStore ptrie.Trie
+ storeIdToStore map[string]FilerStore
+}
+
+func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
+ if innerStore, ok := store.(*FilerStoreWrapper); ok {
+ return innerStore
+ }
+ return &FilerStoreWrapper{
+ defaultStore: store,
+ pathToStore: ptrie.New(),
+ storeIdToStore: make(map[string]FilerStore),
+ }
+}
+
+func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
+ fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
+ err := fsw.pathToStore.Put([]byte(path), storeId)
+ if err != nil {
+ glog.Fatalf("put path specific store: %v", err)
+ }
+}
+
+func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
+ store = fsw.defaultStore
+ if path == "/" {
+ return
+ }
+ var storeId string
+ fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
+ storeId = value.(string)
+ return false
+ })
+ if storeId != "" {
+ store = fsw.storeIdToStore[storeId]
+ }
+ return
+}
+
+func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
+ return fsw.defaultStore
+}
+
+func (fsw *FilerStoreWrapper) GetName() string {
+ return fsw.getDefaultStore().GetName()
+}
+
+func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
+ return fsw.getDefaultStore().Initialize(configuration, prefix)
+}
+
+func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
+ actualStore := fsw.getActualStore(entry.FullPath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ if entry.Mime == "application/octet-stream" {
+ entry.Mime = ""
+ }
+
+ if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
+ return err
+ }
+
+ glog.V(4).Infof("InsertEntry %s", entry.FullPath)
+ return actualStore.InsertEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
+ actualStore := fsw.getActualStore(entry.FullPath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ if entry.Mime == "application/octet-stream" {
+ entry.Mime = ""
+ }
+
+ if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
+ return err
+ }
+
+ glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
+ return actualStore.UpdateEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
+ actualStore := fsw.getActualStore(fp)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("FindEntry %s", fp)
+ entry, err = actualStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+
+ fsw.maybeReadHardLink(ctx, entry)
+
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return
+}
+
+func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fp)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ existingEntry, findErr := fsw.FindEntry(ctx, fp)
+ if findErr == filer_pb.ErrNotFound {
+ return nil
+ }
+ if len(existingEntry.HardLinkId) != 0 {
+ // remove hard link
+ glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
+ if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
+ return err
+ }
+ }
+
+ glog.V(4).Infof("DeleteEntry %s", fp)
+ return actualStore.DeleteEntry(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
+ actualStore := fsw.getActualStore(existingEntry.FullPath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ if len(existingEntry.HardLinkId) != 0 {
+ // remove hard link
+ glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
+ if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
+ return err
+ }
+ }
+
+ glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
+ return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
+}
+
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fp + "/")
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("DeleteFolderChildren %s", fp)
+ return actualStore.DeleteFolderChildren(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+ actualStore := fsw.getActualStore(dirPath + "/")
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
+ entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, err
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+ actualStore := fsw.getActualStore(dirPath + "/")
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
+ }()
+ glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
+ entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ if err == ErrUnsupportedListDirectoryPrefixed {
+ entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ }
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, nil
+}
+
+func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
+ actualStore := fsw.getActualStore(dirPath + "/")
+ entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+
+ if prefix == "" {
+ return
+ }
+
+ count := 0
+ var lastFileName string
+ notPrefixed := entries
+ entries = nil
+ for count < limit && len(notPrefixed) > 0 {
+ for _, entry := range notPrefixed {
+ lastFileName = entry.Name()
+ if strings.HasPrefix(entry.Name(), prefix) {
+ count++
+ entries = append(entries, entry)
+ if count >= limit {
+ break
+ }
+ }
+ }
+ if count < limit {
+ notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
+ if err != nil {
+ return
+ }
+ }
+ }
+ return
+}
+
+func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return fsw.getDefaultStore().BeginTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
+ return fsw.getDefaultStore().CommitTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
+ return fsw.getDefaultStore().RollbackTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) Shutdown() {
+ fsw.getDefaultStore().Shutdown()
+}
+
+func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ return fsw.getDefaultStore().KvPut(ctx, key, value)
+}
+func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ return fsw.getDefaultStore().KvGet(ctx, key)
+}
+func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
+ return fsw.getDefaultStore().KvDelete(ctx, key)
+}
diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go
index d155dbe88..c7742bb19 100644
--- a/weed/filer/redis2/redis_cluster_store.go
+++ b/weed/filer/redis2/redis_cluster_store.go
@@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr
configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses,
Password: password,
ReadOnly: readOnly,
RouteByLatency: routeByLatency,
})
+ store.loadSuperLargeDirectories(superLargeDirectories)
return
}
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
index ed04c817b..da404ed4c 100644
--- a/weed/filer/redis2/redis_store.go
+++ b/weed/filer/redis2/redis_store.go
@@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
+func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
+ store.loadSuperLargeDirectories(superLargeDirectories)
return
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 0374314c0..00d02ea14 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -18,7 +18,21 @@ const (
)
type UniversalRedis2Store struct {
- Client redis.UniversalClient
+ Client redis.UniversalClient
+ superLargeDirectoryHash map[string]bool
+}
+
+func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
+ _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) {
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]bool)
+ for _, dir := range superLargeDirectories {
+ store.superLargeDirectoryHash[dir] = true
+ }
}
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
@@ -47,6 +61,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
dir, name := entry.FullPath.DirAndName()
+ if store.isSuperLargeDirectory(dir) {
+ return nil
+ }
+
if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
@@ -96,6 +114,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
}
dir, name := fullpath.DirAndName()
+ if store.isSuperLargeDirectory(dir) {
+ return nil
+ }
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil {
@@ -108,6 +129,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ if store.isSuperLargeDirectory(string(fullpath)) {
+ return nil
+ }
+
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index d04053df5..2991d14ab 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -109,6 +109,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
+ } else {
+ glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index a1e8ca581..a891c1b47 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -88,7 +88,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
// check collection
if *collection != "" && strings.HasPrefix(*locationPrefix, "/buckets/") {
- return fmt.Errorf("one s3 bucket goes to one collection and not customizable.")
+ return fmt.Errorf("one s3 bucket goes to one collection and not customizable")
}
// check replication
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_s3_bucket_create.go
index 52d96e4c3..28cf1d945 100644
--- a/weed/shell/command_bucket_create.go
+++ b/weed/shell/command_s3_bucket_create.go
@@ -12,29 +12,29 @@ import (
)
func init() {
- Commands = append(Commands, &commandBucketCreate{})
+ Commands = append(Commands, &commandS3BucketCreate{})
}
-type commandBucketCreate struct {
+type commandS3BucketCreate struct {
}
-func (c *commandBucketCreate) Name() string {
- return "bucket.create"
+func (c *commandS3BucketCreate) Name() string {
+ return "s3.bucket.create"
}
-func (c *commandBucketCreate) Help() string {
+func (c *commandS3BucketCreate) Help() string {
return `create a bucket with a given name
Example:
- bucket.create -name <bucket_name> -replication 001
+ s3.bucket.create -name <bucket_name> -replication 001
`
}
-func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name")
- replication := bucketCommand.String("replication", "", "replication setting for the bucket")
+ replication := bucketCommand.String("replication", "", "replication setting for the bucket, if not set it will honor the setting defined by the filer or master")
if err = bucketCommand.Parse(args); err != nil {
return nil
}
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 02790b9e2..a8d8c5c29 100644
--- a/weed/shell/command_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -9,24 +9,24 @@ import (
)
func init() {
- Commands = append(Commands, &commandBucketDelete{})
+ Commands = append(Commands, &commandS3BucketDelete{})
}
-type commandBucketDelete struct {
+type commandS3BucketDelete struct {
}
-func (c *commandBucketDelete) Name() string {
- return "bucket.delete"
+func (c *commandS3BucketDelete) Name() string {
+ return "s3.bucket.delete"
}
-func (c *commandBucketDelete) Help() string {
+func (c *commandS3BucketDelete) Help() string {
return `delete a bucket by a given name
- bucket.delete -name <bucket_name>
+ s3.bucket.delete -name <bucket_name>
`
}
-func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name")
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_s3_bucket_list.go
index 2e446b6b2..4acf9a866 100644
--- a/weed/shell/command_bucket_list.go
+++ b/weed/shell/command_s3_bucket_list.go
@@ -11,23 +11,23 @@ import (
)
func init() {
- Commands = append(Commands, &commandBucketList{})
+ Commands = append(Commands, &commandS3BucketList{})
}
-type commandBucketList struct {
+type commandS3BucketList struct {
}
-func (c *commandBucketList) Name() string {
- return "bucket.list"
+func (c *commandS3BucketList) Name() string {
+ return "s3.bucket.list"
}
-func (c *commandBucketList) Help() string {
+func (c *commandS3BucketList) Help() string {
return `list all buckets
`
}
-func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
if err = bucketCommand.Parse(args); err != nil {
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 52ba08494..89155e9a2 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 15)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 16)
COMMIT = ""
)