aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/release.yml8
-rw-r--r--README.md6
-rw-r--r--docker/Makefile3
-rw-r--r--docker/local-k8s-compose.yml65
-rw-r--r--docker/seaweedfs.sql12
-rw-r--r--go.mod1
-rw-r--r--go.sum15
-rw-r--r--k8s/seaweedfs/values.yaml2
-rw-r--r--weed/command/filer.go10
-rw-r--r--weed/command/s3.go17
-rw-r--r--weed/command/scaffold.go11
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store_kv.go2
-rw-r--r--weed/filer/cassandra/cassandra_store.go44
-rw-r--r--weed/filer/configuration.go9
-rw-r--r--weed/filer/filer.go148
-rw-r--r--weed/filer/filer_deletion.go1
-rw-r--r--weed/filer/filer_search.go76
-rw-r--r--weed/filer/filerstore.go294
-rw-r--r--weed/filer/filerstore_hardlink.go3
-rw-r--r--weed/filer/filerstore_translate_path.go161
-rw-r--r--weed/filer/filerstore_wrapper.go299
-rw-r--r--weed/filer/hbase/hbase_store.go227
-rw-r--r--weed/filer/hbase/hbase_store_kv.go75
-rw-r--r--weed/filer/read_write.go12
-rw-r--r--weed/filer/redis2/redis_cluster_store.go4
-rw-r--r--weed/filer/redis2/redis_store.go4
-rw-r--r--weed/filer/redis2/universal_redis_store.go27
-rw-r--r--weed/s3api/auth_credentials.go45
-rw-r--r--weed/s3api/http/header.go4
-rw-r--r--weed/s3api/s3_constants/s3_actions.go1
-rw-r--r--weed/s3api/s3api_bucket_handlers.go13
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go16
-rw-r--r--weed/s3api/s3api_server.go3
-rw-r--r--weed/server/filer_grpc_server.go2
-rw-r--r--weed/server/filer_server.go3
-rw-r--r--weed/server/filer_server_handlers_read.go2
-rw-r--r--weed/server/filer_server_handlers_read_dir.go3
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go31
-rw-r--r--weed/shell/command_fs_configure.go2
-rw-r--r--weed/shell/command_s3_bucket_create.go (renamed from weed/shell/command_bucket_create.go)18
-rw-r--r--weed/shell/command_s3_bucket_delete.go (renamed from weed/shell/command_bucket_delete.go)14
-rw-r--r--weed/shell/command_s3_bucket_list.go (renamed from weed/shell/command_bucket_list.go)12
-rw-r--r--weed/storage/volume.go2
-rw-r--r--weed/storage/volume_loading.go8
-rw-r--r--weed/util/config.go10
-rw-r--r--weed/util/constants.go2
47 files changed, 1266 insertions, 462 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 2e1e9c162..39108c30a 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -11,13 +11,13 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- goos: [linux, windows, darwin, freebsd, netbsd, openbsd ]
- goarch: ["386", amd64, arm]
+ goos: [linux, windows, darwin ]
+ goarch: [amd64, arm]
exclude:
- goarch: arm
goos: darwin
- - goarch: 386
- goos: freebsd
+ - goarch: arm
+ goos: windows
steps:
diff --git a/README.md b/README.md
index 923331853..70e2ba0b2 100644
--- a/README.md
+++ b/README.md
@@ -84,7 +84,7 @@ There is only 40 bytes of disk storage overhead for each file's metadata. It is
SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf). Also, SeaweedFS implements erasure coding with ideas from [f4: Facebook’s Warm BLOB Storage System](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-muralidhar.pdf)
-On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Mongodb, Redis, Cassandra, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc.
+On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc.
[Back to TOC](#table-of-contents)
@@ -365,7 +365,7 @@ The architectures are mostly the same. SeaweedFS aims to store and read files fa
* SeaweedFS optimizes for small files, ensuring O(1) disk seek operation, and can also handle large files.
* SeaweedFS statically assigns a volume id for a file. Locating file content becomes just a lookup of the volume id, which can be easily cached.
-* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Cassandra, Mongodb, Redis, Elastic Search, MySql, Postgres, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized.
+* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Redis, Cassandra, HBase, Mongodb, Elastic Search, MySql, Postgres, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized.
* SeaweedFS Volume server also communicates directly with clients via HTTP, supporting range queries, direct uploads, etc.
| System | File Metadata | File Content Read| POSIX | REST API | Optimized for large number of small files |
@@ -407,7 +407,7 @@ Ceph uses CRUSH hashing to automatically manage the data placement. SeaweedFS pl
SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read.
-SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Mongodb, Redis, Elastic Search, Cassandra, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage.
+SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Mongodb, Redis, Elastic Search, Cassandra, HBase, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage.
| SeaweedFS | comparable to Ceph | advantage |
| ------------- | ------------- | ---------------- |
diff --git a/docker/Makefile b/docker/Makefile
index 8ab83ca18..c2e9a12e7 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -12,6 +12,9 @@ build:
dev: build
docker-compose -f local-dev-compose.yml -p seaweedfs up
+k8s: build
+ docker-compose -f local-k8s-compose.yml -p seaweedfs up
+
dev_registry: build
docker-compose -f local-registry-compose.yml -p seaweedfs up
diff --git a/docker/local-k8s-compose.yml b/docker/local-k8s-compose.yml
new file mode 100644
index 000000000..0dda89ca4
--- /dev/null
+++ b/docker/local-k8s-compose.yml
@@ -0,0 +1,65 @@
+version: '2'
+
+services:
+ master:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 9333:9333
+ - 19333:19333
+ command: "master -ip=master"
+ volume:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8080:8080
+ - 18080:18080
+ command: "volume -mserver=master:9333 -port=8080 -ip=volume"
+ depends_on:
+ - master
+ mysql:
+ image: percona/percona-server:5.7
+ ports:
+ - 3306:3306
+ volumes:
+ - ./seaweedfs.sql:/docker-entrypoint-initdb.d/seaweedfs.sql
+ environment:
+ - MYSQL_ROOT_PASSWORD=secret
+ - MYSQL_DATABASE=seaweedfs
+ - MYSQL_PASSWORD=secret
+ - MYSQL_USER=seaweedfs
+ filer:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8888:8888
+ - 18888:18888
+ environment:
+ - WEED_MYSQL_HOSTNAME=mysql
+ - WEED_MYSQL_PORT=3306
+ - WEED_MYSQL_DATABASE=seaweedfs
+ - WEED_MYSQL_USERNAME=seaweedfs
+ - WEED_MYSQL_PASSWORD=secret
+ - WEED_MYSQL_ENABLED=true
+ - WEED_LEVELDB2_ENABLED=false
+ command: 'filer -master="master:9333"'
+ depends_on:
+ - master
+ - volume
+ - mysql
+ ingress:
+ image: jwilder/nginx-proxy
+ ports:
+ - "80:80"
+ volumes:
+ - /var/run/docker.sock:/tmp/docker.sock:ro
+ - /tmp/nginx:/etc/nginx/conf.d
+ s3:
+ image: chrislusf/seaweedfs:local
+ ports:
+ - 8333:8333
+ command: 's3 -filer="filer:8888"'
+ depends_on:
+ - master
+ - volume
+ - filer
+ environment:
+ - VIRTUAL_HOST=s3
+ - VIRTUAL_PORT=8333 \ No newline at end of file
diff --git a/docker/seaweedfs.sql b/docker/seaweedfs.sql
new file mode 100644
index 000000000..38ebc575c
--- /dev/null
+++ b/docker/seaweedfs.sql
@@ -0,0 +1,12 @@
+CREATE DATABASE IF NOT EXISTS seaweedfs;
+CREATE USER IF NOT EXISTS 'seaweedfs'@'%' IDENTIFIED BY 'secret';
+GRANT ALL PRIVILEGES ON seaweedfs_fast.* TO 'seaweedfs'@'%';
+FLUSH PRIVILEGES;
+USE seaweedfs;
+CREATE TABLE IF NOT EXISTS filemeta (
+ dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
+ name VARCHAR(1000) COMMENT 'directory or file name',
+ directory TEXT COMMENT 'full path to parent directory',
+ meta LONGBLOB,
+ PRIMARY KEY (dirhash, name)
+) DEFAULT CHARSET=utf8; \ No newline at end of file
diff --git a/go.mod b/go.mod
index 1ecbfd2a9..d93a44379 100644
--- a/go.mod
+++ b/go.mod
@@ -68,6 +68,7 @@ require (
github.com/syndtr/goleveldb v1.0.0
github.com/tidwall/gjson v1.3.2
github.com/tidwall/match v1.0.1
+ github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365
github.com/valyala/bytebufferpool v1.0.0
github.com/viant/assertly v0.5.4 // indirect
github.com/viant/ptrie v0.3.0
diff --git a/go.sum b/go.sum
index 387d2a8ac..66fb72a6b 100644
--- a/go.sum
+++ b/go.sum
@@ -179,6 +179,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
+github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY=
github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg=
@@ -223,6 +225,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
@@ -539,6 +542,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhD
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@@ -562,6 +566,8 @@ github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q=
+github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0=
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDqSt+GTGFMVlhk3ULuV0y9ZmzeVGR4mloJI3M=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@@ -621,6 +627,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 h1:6iRwZdrFUzbcVYZwa8dXTIILGIxmmhjyUPJEcwzPGaU=
+github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365/go.mod h1:zj0GJHGvyf1ed3Jm/Tb4830c/ZKDq+YoLsCt2rGQuT0=
github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
@@ -785,6 +793,7 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfru
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd h1:WgqgiQvkiZWz7XLhphjt2GI2GcGCTIZs9jqXMWmH+oc=
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
@@ -930,8 +939,14 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
+modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o=
+modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg=
+modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
+modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
+rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
+rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 7667d2815..7144b0016 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- imageTag: "2.16"
+ imageTag: "2.18"
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
diff --git a/weed/command/filer.go b/weed/command/filer.go
index e72056893..146840e00 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -43,8 +43,6 @@ type FilerOptions struct {
peers *string
metricsHttpPort *int
cacheToFilerLimit *int
-
- // default leveldb directory, used in "weed server" mode
defaultLevelDbDirectory *string
}
@@ -67,6 +65,7 @@ func init() {
f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list")
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -75,6 +74,7 @@ func init() {
filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
+ filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
}
var cmdFiler = &Command{
@@ -92,6 +92,7 @@ var cmdFiler = &Command{
GET /path/to/
The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
+ If the "filer.toml" is not found, an embedded filer store will be craeted under "-defaultStoreDir".
The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
@@ -127,10 +128,7 @@ func (fo *FilerOptions) startFiler() {
publicVolumeMux = http.NewServeMux()
}
- defaultLevelDbDirectory := "./filerldb2"
- if fo.defaultLevelDbDirectory != nil {
- defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
- }
+ defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
var peers []string
if *fo.peers != "" {
diff --git a/weed/command/s3.go b/weed/command/s3.go
index ed5bb0b80..d8e3e306b 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -23,13 +23,14 @@ var (
)
type S3Options struct {
- filer *string
- port *int
- config *string
- domainName *string
- tlsPrivateKey *string
- tlsCertificate *string
- metricsHttpPort *int
+ filer *string
+ port *int
+ config *string
+ domainName *string
+ tlsPrivateKey *string
+ tlsCertificate *string
+ metricsHttpPort *int
+ allowEmptyFolder *bool
}
func init() {
@@ -41,6 +42,7 @@ func init() {
s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
+ s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders")
}
var cmdS3 = &Command{
@@ -181,6 +183,7 @@ func (s3opt *S3Options) startS3Server() bool {
DomainName: *s3opt.domainName,
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
+ AllowEmptyFolder: *s3opt.allowEmptyFolder,
})
if s3ApiServer_err != nil {
glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 1ab763004..6cfd46427 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -138,12 +138,21 @@ hosts=[
]
username=""
password=""
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
+
+[hbase]
+enabled = false
+zkquorum = ""
+table = "seaweedfs"
[redis2]
enabled = false
address = "localhost:6379"
password = ""
database = 0
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[redis_cluster2]
enabled = false
@@ -160,6 +169,8 @@ password = ""
readOnly = true
# automatically use the closest Redis server for reads
routeByLatency = true
+# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
+superLargeDirectories = []
[etcd]
enabled = false
diff --git a/weed/command/server.go b/weed/command/server.go
index 7e63f8e8a..bd25f94b1 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -113,6 +113,7 @@ func init() {
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+ s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders")
msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go
index 81d105134..792a45ff4 100644
--- a/weed/filer/abstract_sql/abstract_sql_store_kv.go
+++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go
@@ -34,7 +34,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
}
_, err = res.RowsAffected()
- if err != nil {
+ if err != nil {
return fmt.Errorf("kv upsert no rows affected: %s", err)
}
return nil
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index ae8cb7a86..49f5625d9 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -16,8 +16,9 @@ func init() {
}
type CassandraStore struct {
- cluster *gocql.ClusterConfig
- session *gocql.Session
+ cluster *gocql.ClusterConfig
+ session *gocql.Session
+ superLargeDirectoryHash map[string]string
}
func (store *CassandraStore) GetName() string {
@@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) {
+func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
+ dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
@@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam
if err != nil {
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
}
+
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]string)
+ existingHash := make(map[string]string)
+ for _, dir := range superLargeDirectories {
+ // adding dir hash to avoid duplicated names
+ dirHash := util.Md5String([]byte(dir))[:4]
+ store.superLargeDirectoryHash[dir] = dirHash
+ if existingDir, found := existingHash[dirHash]; found {
+ glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
+ }
+ existingHash[dirHash] = dir
+ }
return
}
@@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry
func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
+
var data []byte
if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
@@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
dir, name := fullpath.DirAndName()
+ if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
+ dir, name = dirHash+name, ""
+ }
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=? AND name=?",
@@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full
}
func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
if err := store.session.Query(
"DELETE FROM filemeta WHERE directory=?",
@@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
+ if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
+ }
+
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
if inclusive {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go
index 5f5dfed25..a6f18709e 100644
--- a/weed/filer/configuration.go
+++ b/weed/filer/configuration.go
@@ -25,7 +25,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
}
f.SetStore(store)
- glog.V(0).Infof("configured filer for %s", store.GetName())
+ glog.V(0).Infof("configured filer store to %s", store.GetName())
hasDefaultStoreConfigured = true
break
}
@@ -59,12 +59,15 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
parts := strings.Split(key, ".")
storeName, storeId := parts[0], parts[1]
- store := storeNames[storeName]
+ store, found := storeNames[storeName]
+ if !found {
+ continue
+ }
store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore)
if err := store.Initialize(config, key+"."); err != nil {
glog.Fatalf("Failed to initialize store for %s: %+v", key, err)
}
- location := config.GetString(key+".location")
+ location := config.GetString(key + ".location")
if location == "" {
glog.Errorf("path-specific filer store needs %s", key+".location")
os.Exit(-1)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 800dd35dc..920d79da5 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
- dirParts := strings.Split(string(entry.FullPath), "/")
-
- // fmt.Printf("directory parts: %+v\n", dirParts)
-
- var lastDirectoryEntry *Entry
-
- for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + util.Join(dirParts[:i]...)
- // fmt.Printf("%d directory: %+v\n", i, dirPath)
-
- // check the store directly
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
-
- // no such existing directory
- if dirEntry == nil {
-
- // create the directory
- now := time.Now()
-
- dirEntry = &Entry{
- FullPath: util.FullPath(dirPath),
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0110,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
- },
- }
-
- glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
- if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
- glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
- return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
- }
- } else {
- f.maybeAddBucket(dirEntry)
- f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
- }
-
- } else if !dirEntry.IsDirectory() {
- glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
- return fmt.Errorf("%s is a file", dirPath)
- }
-
- // remember the direct parent directory entry
- if i == len(dirParts)-1 {
- lastDirectoryEntry = dirEntry
- }
-
- }
-
- if lastDirectoryEntry == nil {
- glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
- return fmt.Errorf("parent folder not found: %v", entry.FullPath)
- }
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
@@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
*/
- oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
-
if oldEntry == nil {
+
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
@@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
+func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
+
+ if level == 0 {
+ return nil
+ }
+
+ dirPath := "/" + util.Join(dirParts[:level]...)
+ // fmt.Printf("%d directory: %+v\n", i, dirPath)
+
+ // check the store directly
+ glog.V(4).Infof("find uncached directory: %s", dirPath)
+ dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
+
+ // no such existing directory
+ if dirEntry == nil {
+
+ // ensure parent directory
+ if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
+ // create the directory
+ now := time.Now()
+
+ dirEntry = &Entry{
+ FullPath: util.FullPath(dirPath),
+ Attr: Attr{
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
+ },
+ }
+
+ glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
+ mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
+ if mkdirErr != nil {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
+ return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
+ }
+ } else {
+ f.maybeAddBucket(dirEntry)
+ f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
+ }
+
+ } else if !dirEntry.IsDirectory() {
+ glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
+ return fmt.Errorf("%s is a file", dirPath)
+ }
+
+ return nil
+}
+
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
@@ -280,23 +281,6 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) {
- if strings.HasSuffix(string(p), "/") && len(p) > 1 {
- p = p[0 : len(p)-1]
- }
-
- var makeupEntries []*Entry
- entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
- for expiredCount > 0 && err == nil {
- makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
- if err == nil {
- entries = append(entries, makeupEntries...)
- }
- }
-
- return entries, err
-}
-
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
if listErr != nil {
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index 09af80b42..9eee38277 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -151,4 +151,3 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
}
f.DeleteChunks(toDelete)
}
-
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
new file mode 100644
index 000000000..b26959cb0
--- /dev/null
+++ b/weed/filer/filer_search.go
@@ -0,0 +1,76 @@
+package filer
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "path/filepath"
+ "strings"
+)
+
+func splitPattern(pattern string) (prefix string, restPattern string) {
+ position := strings.Index(pattern, "*")
+ if position >= 0 {
+ return pattern[:position], pattern[position:]
+ }
+ position = strings.Index(pattern, "?")
+ if position >= 0 {
+ return pattern[:position], pattern[position:]
+ }
+ return "", restPattern
+}
+
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) {
+ if strings.HasSuffix(string(p), "/") && len(p) > 1 {
+ p = p[0 : len(p)-1]
+ }
+
+ prefix, restNamePattern := splitPattern(namePattern)
+ var missedCount int
+ var lastFileName string
+
+ entries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern)
+
+ for missedCount > 0 && err == nil {
+ var makeupEntries []*Entry
+ makeupEntries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern)
+ for _, entry := range makeupEntries {
+ entries = append(entries, entry)
+ }
+ }
+
+ return entries, err
+}
+
+func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix, restNamePattern string) (matchedEntries []*Entry, missedCount int, lastFileName string, err error) {
+ var foundEntries []*Entry
+
+ foundEntries, lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix)
+ if err != nil {
+ return
+ }
+ if len(restNamePattern) == 0 {
+ return foundEntries, 0, lastFileName, nil
+ }
+ for _, entry := range foundEntries {
+ nameToTest := strings.ToLower(entry.Name())
+ if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched {
+ matchedEntries = append(matchedEntries, entry)
+ } else {
+ missedCount++
+ }
+ }
+ return
+}
+
+func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, lastFileName string, err error) {
+ var makeupEntries []*Entry
+ var expiredCount int
+ entries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
+ for expiredCount > 0 && err == nil {
+ makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
+ if err == nil {
+ entries = append(entries, makeupEntries...)
+ }
+ }
+ return
+}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 10aee75df..f1e6c6c35 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -3,22 +3,14 @@ package filer
import (
"context"
"errors"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/viant/ptrie"
- "strings"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
- ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
- ErrKvNotImplemented = errors.New("kv not implemented yet")
- ErrKvNotFound = errors.New("kv: not found")
-
- _ = VirtualFilerStore(&FilerStoreWrapper{})
+ ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
+ ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
+ ErrKvNotImplemented = errors.New("kv not implemented yet")
+ ErrKvNotFound = errors.New("kv: not found")
)
type FilerStore interface {
@@ -45,281 +37,3 @@ type FilerStore interface {
Shutdown()
}
-
-type VirtualFilerStore interface {
- FilerStore
- DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
- DeleteOneEntry(ctx context.Context, entry *Entry) error
- AddPathSpecificStore(path string, storeId string, store FilerStore)
-}
-
-type FilerStoreWrapper struct {
- defaultStore FilerStore
- pathToStore ptrie.Trie
- storeIdToStore map[string]FilerStore
-}
-
-func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
- if innerStore, ok := store.(*FilerStoreWrapper); ok {
- return innerStore
- }
- return &FilerStoreWrapper{
- defaultStore: store,
- pathToStore: ptrie.New(),
- storeIdToStore: make(map[string]FilerStore),
- }
-}
-
-func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
- fsw.storeIdToStore[storeId] = store
- err := fsw.pathToStore.Put([]byte(path), storeId)
- if err != nil {
- glog.Fatalf("put path specific store: %v", err)
- }
-}
-
-func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
- store = fsw.defaultStore
- if path == "" {
- return
- }
- var storeId string
- fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
- storeId = value.(string)
- return false
- })
- if storeId != "" {
- store = fsw.storeIdToStore[storeId]
- }
- return
-}
-
-func (fsw *FilerStoreWrapper) GetName() string {
- return fsw.getActualStore("").GetName()
-}
-
-func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
- return fsw.getActualStore("").Initialize(configuration, prefix)
-}
-
-func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
- actualStore := fsw.getActualStore(entry.FullPath)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- if entry.Mime == "application/octet-stream" {
- entry.Mime = ""
- }
-
- if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
- return err
- }
-
- glog.V(4).Infof("InsertEntry %s", entry.FullPath)
- return actualStore.InsertEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
- actualStore := fsw.getActualStore(entry.FullPath)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
- }()
-
- filer_pb.BeforeEntrySerialization(entry.Chunks)
- if entry.Mime == "application/octet-stream" {
- entry.Mime = ""
- }
-
- if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
- return err
- }
-
- glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
- return actualStore.UpdateEntry(ctx, entry)
-}
-
-func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
- actualStore := fsw.getActualStore(fp)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
- }()
-
- glog.V(4).Infof("FindEntry %s", fp)
- entry, err = actualStore.FindEntry(ctx, fp)
- if err != nil {
- return nil, err
- }
-
- fsw.maybeReadHardLink(ctx, entry)
-
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- return
-}
-
-func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- actualStore := fsw.getActualStore(fp)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
- }()
-
- existingEntry, findErr := fsw.FindEntry(ctx, fp)
- if findErr == filer_pb.ErrNotFound {
- return nil
- }
- if len(existingEntry.HardLinkId) != 0 {
- // remove hard link
- glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
- if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
- return err
- }
- }
-
- glog.V(4).Infof("DeleteEntry %s", fp)
- return actualStore.DeleteEntry(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
- actualStore := fsw.getActualStore(existingEntry.FullPath)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
- }()
-
- if len(existingEntry.HardLinkId) != 0 {
- // remove hard link
- glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
- if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
- return err
- }
- }
-
- glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
- return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
-}
-
-func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
- actualStore := fsw.getActualStore(fp)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
- }()
-
- glog.V(4).Infof("DeleteFolderChildren %s", fp)
- return actualStore.DeleteFolderChildren(ctx, fp)
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
- actualStore := fsw.getActualStore(dirPath)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
- }()
-
- glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
- entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, err
-}
-
-func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
- actualStore := fsw.getActualStore(dirPath)
- stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
- start := time.Now()
- defer func() {
- stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
- }()
- glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- if err == ErrUnsupportedListDirectoryPrefixed {
- entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- }
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, nil
-}
-
-func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
- actualStore := fsw.getActualStore(dirPath)
- entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
-
- if prefix == "" {
- return
- }
-
- count := 0
- var lastFileName string
- notPrefixed := entries
- entries = nil
- for count < limit && len(notPrefixed) > 0 {
- for _, entry := range notPrefixed {
- lastFileName = entry.Name()
- if strings.HasPrefix(entry.Name(), prefix) {
- count++
- entries = append(entries, entry)
- if count >= limit {
- break
- }
- }
- }
- if count < limit {
- notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
- return fsw.getActualStore("").BeginTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
- return fsw.getActualStore("").CommitTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
- return fsw.getActualStore("").RollbackTransaction(ctx)
-}
-
-func (fsw *FilerStoreWrapper) Shutdown() {
- fsw.getActualStore("").Shutdown()
-}
-
-func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- return fsw.getActualStore("").KvPut(ctx, key, value)
-}
-func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- return fsw.getActualStore("").KvGet(ctx, key)
-}
-func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
- return fsw.getActualStore("").KvDelete(ctx, key)
-}
diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go
index 144b7286e..316c76a0c 100644
--- a/weed/filer/filerstore_hardlink.go
+++ b/weed/filer/filerstore_hardlink.go
@@ -19,7 +19,8 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry
// check what is existing entry
glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
- existingEntry, err := fsw.getActualStore(entry.FullPath).FindEntry(ctx, entry.FullPath)
+ actualStore := fsw.getActualStore(entry.FullPath)
+ existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound {
return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
}
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
new file mode 100644
index 000000000..ea0f9db77
--- /dev/null
+++ b/weed/filer/filerstore_translate_path.go
@@ -0,0 +1,161 @@
+package filer
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
+)
+
+var (
+ _ = FilerStore(&FilerStorePathTranlator{})
+)
+
+type FilerStorePathTranlator struct {
+ actualStore FilerStore
+ storeRoot string
+}
+
+func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator {
+ if innerStore, ok := store.(*FilerStorePathTranlator); ok {
+ return innerStore
+ }
+
+ if !strings.HasSuffix(storeRoot, "/") {
+ storeRoot += "/"
+ }
+
+ return &FilerStorePathTranlator{
+ actualStore: store,
+ storeRoot: storeRoot,
+ }
+}
+
+func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) {
+ newPath = fp
+ if t.storeRoot == "/" {
+ return
+ }
+ newPath = fp[len(t.storeRoot)-1:]
+ if newPath == "" {
+ newPath = "/"
+ }
+ return
+}
+func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) {
+ previousPath = entry.FullPath
+ if t.storeRoot == "/" {
+ return
+ }
+ entry.FullPath = t.translatePath(previousPath)
+ return
+}
+func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) {
+ entry.FullPath = previousPath
+}
+
+func (t *FilerStorePathTranlator) GetName() string {
+ return t.actualStore.GetName()
+}
+
+func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error {
+ return t.actualStore.Initialize(configuration, prefix)
+}
+
+func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error {
+ previousPath := t.changeEntryPath(entry)
+ defer t.recoverEntryPath(entry, previousPath)
+
+ return t.actualStore.InsertEntry(ctx, entry)
+}
+
+func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error {
+ previousPath := t.changeEntryPath(entry)
+ defer t.recoverEntryPath(entry, previousPath)
+
+ return t.actualStore.UpdateEntry(ctx, entry)
+}
+
+func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
+ if t.storeRoot == "/" {
+ return t.actualStore.FindEntry(ctx, fp)
+ }
+ newFullPath := t.translatePath(fp)
+ entry, err = t.actualStore.FindEntry(ctx, newFullPath)
+ if err == nil {
+ entry.FullPath = fp[:len(t.storeRoot)-1] + entry.FullPath
+ }
+ return
+}
+
+func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ newFullPath := t.translatePath(fp)
+ return t.actualStore.DeleteEntry(ctx, newFullPath)
+}
+
+func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
+
+ previousPath := t.changeEntryPath(existingEntry)
+ defer t.recoverEntryPath(existingEntry, previousPath)
+
+ return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath)
+}
+
+func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ newFullPath := t.translatePath(fp)
+
+ return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
+}
+
+func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+
+ newFullPath := t.translatePath(dirPath)
+
+ entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
+ }
+ return entries, err
+}
+
+func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+
+ newFullPath := t.translatePath(dirPath)
+
+ entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
+ }
+ return entries, nil
+}
+
+func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return t.actualStore.BeginTransaction(ctx)
+}
+
+func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error {
+ return t.actualStore.CommitTransaction(ctx)
+}
+
+func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error {
+ return t.actualStore.RollbackTransaction(ctx)
+}
+
+func (t *FilerStorePathTranlator) Shutdown() {
+ t.actualStore.Shutdown()
+}
+
+func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ return t.actualStore.KvPut(ctx, key, value)
+}
+func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ return t.actualStore.KvGet(ctx, key)
+}
+func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) {
+ return t.actualStore.KvDelete(ctx, key)
+}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
new file mode 100644
index 000000000..3206d5ba4
--- /dev/null
+++ b/weed/filer/filerstore_wrapper.go
@@ -0,0 +1,299 @@
+package filer
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/viant/ptrie"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ _ = VirtualFilerStore(&FilerStoreWrapper{})
+)
+
+type VirtualFilerStore interface {
+ FilerStore
+ DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
+ DeleteOneEntry(ctx context.Context, entry *Entry) error
+ AddPathSpecificStore(path string, storeId string, store FilerStore)
+}
+
+type FilerStoreWrapper struct {
+ defaultStore FilerStore
+ pathToStore ptrie.Trie
+ storeIdToStore map[string]FilerStore
+}
+
+func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
+ if innerStore, ok := store.(*FilerStoreWrapper); ok {
+ return innerStore
+ }
+ return &FilerStoreWrapper{
+ defaultStore: store,
+ pathToStore: ptrie.New(),
+ storeIdToStore: make(map[string]FilerStore),
+ }
+}
+
+func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
+ fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
+ err := fsw.pathToStore.Put([]byte(path), storeId)
+ if err != nil {
+ glog.Fatalf("put path specific store: %v", err)
+ }
+}
+
+func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
+ store = fsw.defaultStore
+ if path == "/" {
+ return
+ }
+ var storeId string
+ fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool {
+ storeId = value.(string)
+ return false
+ })
+ if storeId != "" {
+ store = fsw.storeIdToStore[storeId]
+ }
+ return
+}
+
+func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) {
+ return fsw.defaultStore
+}
+
+func (fsw *FilerStoreWrapper) GetName() string {
+ return fsw.getDefaultStore().GetName()
+}
+
+func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
+ return fsw.getDefaultStore().Initialize(configuration, prefix)
+}
+
+func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
+ actualStore := fsw.getActualStore(entry.FullPath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ if entry.Mime == "application/octet-stream" {
+ entry.Mime = ""
+ }
+
+ if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
+ return err
+ }
+
+ glog.V(4).Infof("InsertEntry %s", entry.FullPath)
+ return actualStore.InsertEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
+ actualStore := fsw.getActualStore(entry.FullPath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
+ }()
+
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
+ if entry.Mime == "application/octet-stream" {
+ entry.Mime = ""
+ }
+
+ if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
+ return err
+ }
+
+ glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
+ return actualStore.UpdateEntry(ctx, entry)
+}
+
+func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
+ actualStore := fsw.getActualStore(fp)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("FindEntry %s", fp)
+ entry, err = actualStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+
+ fsw.maybeReadHardLink(ctx, entry)
+
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return
+}
+
+func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fp)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ existingEntry, findErr := fsw.FindEntry(ctx, fp)
+ if findErr == filer_pb.ErrNotFound {
+ return nil
+ }
+ if len(existingEntry.HardLinkId) != 0 {
+ // remove hard link
+ glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
+ if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
+ return err
+ }
+ }
+
+ glog.V(4).Infof("DeleteEntry %s", fp)
+ return actualStore.DeleteEntry(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
+ actualStore := fsw.getActualStore(existingEntry.FullPath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ if len(existingEntry.HardLinkId) != 0 {
+ // remove hard link
+ glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath)
+ if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
+ return err
+ }
+ }
+
+ glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
+ return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
+}
+
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fp + "/")
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("DeleteFolderChildren %s", fp)
+ return actualStore.DeleteFolderChildren(ctx, fp)
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+ actualStore := fsw.getActualStore(dirPath + "/")
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
+ entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, err
+}
+
+func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+ actualStore := fsw.getActualStore(dirPath + "/")
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
+ }()
+ glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
+ entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ if err == ErrUnsupportedListDirectoryPrefixed {
+ entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ }
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, nil
+}
+
+func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
+ actualStore := fsw.getActualStore(dirPath + "/")
+ entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+
+ if prefix == "" {
+ return
+ }
+
+ count := 0
+ var lastFileName string
+ notPrefixed := entries
+ entries = nil
+ for count < limit && len(notPrefixed) > 0 {
+ for _, entry := range notPrefixed {
+ lastFileName = entry.Name()
+ if strings.HasPrefix(entry.Name(), prefix) {
+ count++
+ entries = append(entries, entry)
+ if count >= limit {
+ break
+ }
+ }
+ }
+ if count < limit {
+ notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
+ if err != nil {
+ return
+ }
+ }
+ }
+ return
+}
+
+func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return fsw.getDefaultStore().BeginTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
+ return fsw.getDefaultStore().CommitTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
+ return fsw.getDefaultStore().RollbackTransaction(ctx)
+}
+
+func (fsw *FilerStoreWrapper) Shutdown() {
+ fsw.getDefaultStore().Shutdown()
+}
+
+func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ return fsw.getDefaultStore().KvPut(ctx, key, value)
+}
+func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ return fsw.getDefaultStore().KvGet(ctx, key)
+}
+func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
+ return fsw.getDefaultStore().KvDelete(ctx, key)
+}
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
new file mode 100644
index 000000000..6b0ad58b9
--- /dev/null
+++ b/weed/filer/hbase/hbase_store.go
@@ -0,0 +1,227 @@
+package hbase
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tsuna/gohbase"
+ "github.com/tsuna/gohbase/hrpc"
+ "io"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &HbaseStore{})
+}
+
+type HbaseStore struct {
+ Client gohbase.Client
+ table []byte
+ cfKv string
+ cfMetaDir string
+ column string
+}
+
+func (store *HbaseStore) GetName() string {
+ return "hbase"
+}
+
+func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"zkquorum"),
+ configuration.GetString(prefix+"table"),
+ )
+}
+
+func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
+ store.Client = gohbase.NewClient(zkquorum)
+ store.table = []byte(table)
+ store.cfKv = "kv"
+ store.cfMetaDir = "meta"
+ store.column = "a"
+
+ // check table exists
+ key := "whatever"
+ headers := map[string][]string{store.cfMetaDir: nil}
+ get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
+ if err != nil {
+ return fmt.Errorf("NewGet returned an error: %v", err)
+ }
+ _, err = store.Client.Get(get)
+ if err != gohbase.TableNotFound {
+ return nil
+ }
+
+ // create table
+ adminClient := gohbase.NewAdminClient(zkquorum)
+ cFamilies := []string{store.cfKv, store.cfMetaDir}
+ cf := make(map[string]map[string]string, len(cFamilies))
+ for _, f := range cFamilies {
+ cf[f] = nil
+ }
+ ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
+ if err := adminClient.CreateTable(ct); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
+ return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
+}
+
+func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
+ value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
+ if err != nil {
+ if err == filer.ErrKvNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ return nil, err
+ }
+
+ entry = &filer.Entry{
+ FullPath: path,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+ return entry, nil
+}
+
+func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
+ return store.doDelete(ctx, store.cfMetaDir, []byte(path))
+}
+
+func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
+
+ family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
+ expectedPrefix := []byte(path.Child(""))
+ scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
+ if err != nil {
+ return err
+ }
+
+ scanner := store.Client.Scan(scan)
+ defer scanner.Close()
+ for {
+ res, err := scanner.Next()
+ if err != nil {
+ break
+ }
+ if len(res.Cells) == 0 {
+ continue
+ }
+ cell := res.Cells[0]
+
+ if !bytes.HasPrefix(cell.Row, expectedPrefix) {
+ break
+ }
+ fullpath := util.FullPath(cell.Row)
+ dir, _ := fullpath.DirAndName()
+ if dir != string(path) {
+ continue
+ }
+
+ err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
+ if err != nil {
+ break
+ }
+
+ }
+ return
+}
+
+func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+}
+
+func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) {
+ family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
+ expectedPrefix := []byte(dirPath.Child(prefix))
+ scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
+ if err != nil {
+ return nil, err
+ }
+
+ var entries []*filer.Entry
+ scanner := store.Client.Scan(scan)
+ defer scanner.Close()
+ for {
+ res, err := scanner.Next()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return entries, err
+ }
+ if len(res.Cells) == 0 {
+ continue
+ }
+ cell := res.Cells[0]
+
+ if !bytes.HasPrefix(cell.Row, expectedPrefix) {
+ break
+ }
+
+ fullpath := util.FullPath(cell.Row)
+ dir, fileName := fullpath.DirAndName()
+ if dir != string(dirPath) {
+ continue
+ }
+
+ value := cell.Value
+
+ if fileName == startFileName && !includeStartFile {
+ continue
+ }
+
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: fullpath,
+ }
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ entries = append(entries, entry)
+ }
+
+ return entries, nil
+}
+
+func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+
+func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *HbaseStore) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go
new file mode 100644
index 000000000..26bf763e2
--- /dev/null
+++ b/weed/filer/hbase/hbase_store_kv.go
@@ -0,0 +1,75 @@
+package hbase
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/tsuna/gohbase/hrpc"
+ "time"
+)
+
+const(
+ COLUMN_NAME = "a"
+)
+func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ return store.doPut(ctx, store.cfKv, key, value, 0)
+}
+
+func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ return store.doGet(ctx, store.cfKv, key)
+}
+
+func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ return store.doDelete(ctx, store.cfKv, key)
+}
+
+func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) {
+ if ttlSecond > 0 {
+ return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second))
+ }
+ return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal))
+}
+
+func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) {
+ values := map[string]map[string][]byte{cf: map[string][]byte{}}
+ values[cf][COLUMN_NAME] = value
+ putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...)
+ if err != nil {
+ return err
+ }
+ _, err = store.Client.Put(putRequest)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (value []byte, err error) {
+ family := map[string][]string{cf: {COLUMN_NAME}}
+ getRequest, err := hrpc.NewGet(context.Background(), store.table, key, hrpc.Families(family))
+ if err != nil {
+ return nil, err
+ }
+ getResp, err := store.Client.Get(getRequest)
+ if err != nil {
+ return nil, err
+ }
+ if len(getResp.Cells) == 0 {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return getResp.Cells[0].Value, nil
+}
+
+func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) {
+ values := map[string]map[string][]byte{cf: map[string][]byte{}}
+ values[cf][COLUMN_NAME] = nil
+ deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal))
+ if err != nil {
+ return err
+ }
+ _, err = store.Client.Delete(deleteRequest)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index 548473116..1f78057ef 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -82,8 +82,8 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte
if err == filer_pb.ErrNotFound {
err = filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
- Directory: dir,
- Entry: &filer_pb.Entry{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
@@ -92,7 +92,7 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte
FileMode: uint32(0644),
Collection: "",
Replication: "",
- FileSize: uint64(len(content)),
+ FileSize: uint64(len(content)),
},
Content: content,
},
@@ -103,10 +103,10 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte
entry.Attributes.Mtime = time.Now().Unix()
entry.Attributes.FileSize = uint64(len(content))
err = filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: entry,
+ Directory: dir,
+ Entry: entry,
})
}
return err
-} \ No newline at end of file
+}
diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go
index d155dbe88..c7742bb19 100644
--- a/weed/filer/redis2/redis_cluster_store.go
+++ b/weed/filer/redis2/redis_cluster_store.go
@@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr
configuration.GetString(prefix+"password"),
configuration.GetBool(prefix+"useReadOnly"),
configuration.GetBool(prefix+"routeByLatency"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
store.Client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addresses,
Password: password,
ReadOnly: readOnly,
RouteByLatency: routeByLatency,
})
+ store.loadSuperLargeDirectories(superLargeDirectories)
return
}
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
index ed04c817b..da404ed4c 100644
--- a/weed/filer/redis2/redis_store.go
+++ b/weed/filer/redis2/redis_store.go
@@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st
configuration.GetString(prefix+"address"),
configuration.GetString(prefix+"password"),
configuration.GetInt(prefix+"database"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
)
}
-func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) {
+func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
store.Client = redis.NewClient(&redis.Options{
Addr: hostPort,
Password: password,
DB: database,
})
+ store.loadSuperLargeDirectories(superLargeDirectories)
return
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 0374314c0..00d02ea14 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -18,7 +18,21 @@ const (
)
type UniversalRedis2Store struct {
- Client redis.UniversalClient
+ Client redis.UniversalClient
+ superLargeDirectoryHash map[string]bool
+}
+
+func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
+ _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) {
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]bool)
+ for _, dir := range superLargeDirectories {
+ store.superLargeDirectoryHash[dir] = true
+ }
}
func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) {
@@ -47,6 +61,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
dir, name := entry.FullPath.DirAndName()
+ if store.isSuperLargeDirectory(dir) {
+ return nil
+ }
+
if name != "" {
if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
@@ -96,6 +114,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
}
dir, name := fullpath.DirAndName()
+ if store.isSuperLargeDirectory(dir) {
+ return nil
+ }
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil {
@@ -108,6 +129,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ if store.isSuperLargeDirectory(string(fullpath)) {
+ return nil
+ }
+
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index da0a38dbf..b8af6381a 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -3,6 +3,7 @@ package s3api
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"io/ioutil"
"net/http"
@@ -79,7 +80,6 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []b
return nil
}
-
func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error {
var identities []*Identity
for _, ident := range config.Identities {
@@ -186,7 +186,6 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
return identity, s3err.ErrNotImplemented
}
- glog.V(3).Infof("auth error: %v", s3Err)
if s3Err != s3err.ErrNone {
return identity, s3Err
}
@@ -203,6 +202,44 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
}
+func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err.ErrorCode) {
+ var identity *Identity
+ var s3Err s3err.ErrorCode
+ var found bool
+ switch getRequestAuthType(r) {
+ case authTypeStreamingSigned:
+ return identity, s3err.ErrNone
+ case authTypeUnknown:
+ glog.V(3).Infof("unknown auth type")
+ return identity, s3err.ErrAccessDenied
+ case authTypePresignedV2, authTypeSignedV2:
+ glog.V(3).Infof("v2 auth type")
+ identity, s3Err = iam.isReqAuthenticatedV2(r)
+ case authTypeSigned, authTypePresigned:
+ glog.V(3).Infof("v4 auth type")
+ identity, s3Err = iam.reqSignatureV4Verify(r)
+ case authTypePostPolicy:
+ glog.V(3).Infof("post policy auth type")
+ return identity, s3err.ErrNone
+ case authTypeJWT:
+ glog.V(3).Infof("jwt auth type")
+ return identity, s3err.ErrNotImplemented
+ case authTypeAnonymous:
+ identity, found = iam.lookupAnonymous()
+ if !found {
+ return identity, s3err.ErrAccessDenied
+ }
+ default:
+ return identity, s3err.ErrNotImplemented
+ }
+
+ glog.V(3).Infof("auth error: %v", s3Err)
+ if s3Err != s3err.ErrNone {
+ return identity, s3Err
+ }
+ return identity, s3err.ErrNone
+}
+
func (identity *Identity) canDo(action Action, bucket string) bool {
if identity.isAdmin() {
return true
@@ -216,10 +253,14 @@ func (identity *Identity) canDo(action Action, bucket string) bool {
return false
}
limitedByBucket := string(action) + ":" + bucket
+ adminLimitedByBucket := s3_constants.ACTION_ADMIN + ":" + bucket
for _, a := range identity.Actions {
if string(a) == limitedByBucket {
return true
}
+ if string(a) == adminLimitedByBucket {
+ return true
+ }
}
return false
}
diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go
index a960d2370..6614b0af0 100644
--- a/weed/s3api/http/header.go
+++ b/weed/s3api/http/header.go
@@ -31,6 +31,6 @@ const (
// Non-Standard S3 HTTP request constants
const (
- AmzIdentityId = "x-amz-identity-id"
- AmzIsAdmin = "x-amz-is-admin" // only set to http request header as a context
+ AmzIdentityId = "s3-identity-id"
+ AmzIsAdmin = "s3-is-admin" // only set to http request header as a context
)
diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go
index f6056ef78..4e484ac98 100644
--- a/weed/s3api/s3_constants/s3_actions.go
+++ b/weed/s3api/s3_constants/s3_actions.go
@@ -7,4 +7,3 @@ const (
ACTION_TAGGING = "Tagging"
ACTION_LIST = "List"
)
-
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 00b7382cc..f750f6e53 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -4,6 +4,7 @@ import (
"context"
"encoding/xml"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"math"
"net/http"
"time"
@@ -26,6 +27,16 @@ type ListAllMyBucketsResult struct {
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
+ var identity *Identity
+ var s3Err s3err.ErrorCode
+ if s3a.iam.isEnabled() {
+ identity, s3Err = s3a.iam.authUser(r)
+ if s3Err != s3err.ErrNone {
+ writeErrorResponse(w, s3Err, r.URL)
+ return
+ }
+ }
+
var response ListAllMyBucketsResult
entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
@@ -40,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
- if !s3a.hasAccess(r, entry) {
+ if identity!=nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) {
continue
}
buckets = append(buckets, &s3.Bucket{
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index dce2fd6b0..fb1497b78 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -264,8 +264,10 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
}
} else {
var isEmpty bool
- if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
- return
+ if !s3a.option.AllowEmptyFolder {
+ if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
+ glog.Errorf("check empty folder %s: %v", dir, err)
+ }
}
if !isEmpty {
eachEntryFn(dir, entry)
@@ -310,13 +312,17 @@ func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string,
func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerClient, parentDir, name string) (isEmpty bool, err error) {
// println("+ isDirectoryAllEmpty", dir, name)
+ glog.V(4).Infof("+ isEmpty %s/%s", parentDir, name)
+ defer glog.V(4).Infof("- isEmpty %s/%s %v", parentDir, name, isEmpty)
var fileCounter int
var subDirs []string
currentDir := parentDir + "/" + name
var startFrom string
var isExhausted bool
- for fileCounter == 0 && !isExhausted {
+ var foundEntry bool
+ for fileCounter == 0 && !isExhausted && err == nil {
err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ foundEntry = true
if entry.IsDirectory {
subDirs = append(subDirs, entry.Name)
} else {
@@ -324,8 +330,12 @@ func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerCli
}
startFrom = entry.Name
isExhausted = isExhausted || isLast
+ glog.V(4).Infof(" * %s/%s isLast: %t", currentDir, startFrom, isLast)
return nil
}, startFrom, false, 8)
+ if !foundEntry {
+ break
+ }
}
if err != nil {
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 93e2bb575..4993104ae 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -20,6 +20,7 @@ type S3ApiServerOption struct {
DomainName string
BucketsPath string
GrpcDialOption grpc.DialOption
+ AllowEmptyFolder bool
}
type S3ApiServer struct {
@@ -128,7 +129,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
}
// ListBuckets
- apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN), "LIST"))
+ apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.ListBucketsHandler, "LIST"))
// NotFound
apiRouter.NotFoundHandler = http.HandlerFunc(notFoundHandler)
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5f1b2d819..38472df2a 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -326,7 +326,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
- if err != nil {
+ if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
}
return resp, nil
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index d04053df5..2da129ab2 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -23,6 +23,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
@@ -109,6 +110,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
+ } else {
+ glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index d55bf7cbb..4d61193ec 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -6,6 +6,7 @@ import (
"io"
"mime"
"net/http"
+ "net/url"
"path/filepath"
"strconv"
"strings"
@@ -121,6 +122,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
setEtag(w, etag)
filename := entry.Name()
+ filename = url.QueryEscape(filename)
adjustHeaderContentDisposition(w, r, filename)
totalSize := int64(entry.Size())
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 99345550c..f303ba1d4 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -34,8 +34,9 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
}
lastFileName := r.FormValue("lastFileName")
+ namePattern := r.FormValue("namePattern")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "")
+ entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 82c6c11b4..eee39152b 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -209,17 +209,36 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
- if assignErr != nil {
- return nil, nil, 0, assignErr, nil
+ data, err := ioutil.ReadAll(limitedReader)
+ if err != nil {
+ return nil, nil, 0, err, nil
}
+ dataReader := util.NewBytesReader(data)
+
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var assignErr, uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr, nil
+ }
- // upload the chunk to the volume server
- uploadResult, uploadErr, data := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ time.Sleep(251 * time.Millisecond)
+ continue
+ }
+ break
+ }
if uploadErr != nil {
return nil, nil, 0, uploadErr, nil
}
+
content = data
// if last chunk exhausted the reader exactly at the border
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index 497ef4f9e..06ae15c9e 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -86,7 +86,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
// check collection
if *collection != "" && strings.HasPrefix(*locationPrefix, "/buckets/") {
- return fmt.Errorf("one s3 bucket goes to one collection and not customizable.")
+ return fmt.Errorf("one s3 bucket goes to one collection and not customizable")
}
// check replication
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_s3_bucket_create.go
index 52d96e4c3..a512ffc4a 100644
--- a/weed/shell/command_bucket_create.go
+++ b/weed/shell/command_s3_bucket_create.go
@@ -12,29 +12,31 @@ import (
)
func init() {
- Commands = append(Commands, &commandBucketCreate{})
+ Commands = append(Commands, &commandS3BucketCreate{})
}
-type commandBucketCreate struct {
+type commandS3BucketCreate struct {
}
-func (c *commandBucketCreate) Name() string {
- return "bucket.create"
+func (c *commandS3BucketCreate) Name() string {
+ return "s3.bucket.create"
}
-func (c *commandBucketCreate) Help() string {
+func (c *commandS3BucketCreate) Help() string {
return `create a bucket with a given name
Example:
- bucket.create -name <bucket_name> -replication 001
+ s3.bucket.create -name <bucket_name> -replication 001
`
}
-func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name")
- replication := bucketCommand.String("replication", "", "replication setting for the bucket")
+ replication := bucketCommand.String("replication", "", "replication setting for the bucket, if not set "+
+ "it will honor the value defined by the filer if it exists, "+
+ "else it will honor the value defined on the master")
if err = bucketCommand.Parse(args); err != nil {
return nil
}
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 02790b9e2..a8d8c5c29 100644
--- a/weed/shell/command_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -9,24 +9,24 @@ import (
)
func init() {
- Commands = append(Commands, &commandBucketDelete{})
+ Commands = append(Commands, &commandS3BucketDelete{})
}
-type commandBucketDelete struct {
+type commandS3BucketDelete struct {
}
-func (c *commandBucketDelete) Name() string {
- return "bucket.delete"
+func (c *commandS3BucketDelete) Name() string {
+ return "s3.bucket.delete"
}
-func (c *commandBucketDelete) Help() string {
+func (c *commandS3BucketDelete) Help() string {
return `delete a bucket by a given name
- bucket.delete -name <bucket_name>
+ s3.bucket.delete -name <bucket_name>
`
}
-func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
bucketName := bucketCommand.String("name", "", "bucket name")
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_s3_bucket_list.go
index 2e446b6b2..4acf9a866 100644
--- a/weed/shell/command_bucket_list.go
+++ b/weed/shell/command_s3_bucket_list.go
@@ -11,23 +11,23 @@ import (
)
func init() {
- Commands = append(Commands, &commandBucketList{})
+ Commands = append(Commands, &commandS3BucketList{})
}
-type commandBucketList struct {
+type commandS3BucketList struct {
}
-func (c *commandBucketList) Name() string {
- return "bucket.list"
+func (c *commandS3BucketList) Name() string {
+ return "s3.bucket.list"
}
-func (c *commandBucketList) Help() string {
+func (c *commandS3BucketList) Help() string {
return `list all buckets
`
}
-func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
if err = bucketCommand.Parse(args); err != nil {
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 7712c5eda..80a74c3e3 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -47,7 +47,7 @@ type Volume struct {
volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation
- lastIoError error
+ lastIoError error
}
func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 34eee876d..a6efc630d 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -92,7 +92,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if err == nil && alsoLoadIndex {
// adjust for existing volumes with .idx together with .dat files
if v.dirIdx != v.dir {
- if util.FileExists(v.DataFileName()+".idx") {
+ if util.FileExists(v.DataFileName() + ".idx") {
v.dirIdx = v.dir
}
}
@@ -100,12 +100,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if v.noWriteOrDelete {
glog.V(0).Infoln("open to read file", v.FileName(".idx"))
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
- return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
+ return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
}
} else {
glog.V(1).Infoln("open to write file", v.FileName(".idx"))
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
+ return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
}
}
if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
@@ -115,7 +115,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if v.noWriteOrDelete || v.noWriteCanDelete {
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil {
- glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
+ glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
}
} else {
switch needleMapKind {
diff --git a/weed/util/config.go b/weed/util/config.go
index 47f433028..94e621e34 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -19,11 +19,11 @@ type Configuration interface {
func LoadConfiguration(configFileName string, required bool) (loaded bool) {
// find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
+ viper.SetConfigName(configFileName) // name of config file (without extension)
+ viper.AddConfigPath(".") // optionally look for config in the working directory
+ viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in
+ viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 89155e9a2..08d169545 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 16)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 18)
COMMIT = ""
)