diff options
47 files changed, 1266 insertions, 462 deletions
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 2e1e9c162..39108c30a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -11,13 +11,13 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - goos: [linux, windows, darwin, freebsd, netbsd, openbsd ] - goarch: ["386", amd64, arm] + goos: [linux, windows, darwin ] + goarch: [amd64, arm] exclude: - goarch: arm goos: darwin - - goarch: 386 - goos: freebsd + - goarch: arm + goos: windows steps: @@ -84,7 +84,7 @@ There is only 40 bytes of disk storage overhead for each file's metadata. It is SeaweedFS started by implementing [Facebook's Haystack design paper](http://www.usenix.org/event/osdi10/tech/full_papers/Beaver.pdf). Also, SeaweedFS implements erasure coding with ideas from [f4: Facebookâs Warm BLOB Storage System](https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-muralidhar.pdf) -On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Mongodb, Redis, Cassandra, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc. +On top of the object store, optional [Filer] can support directories and POSIX attributes. Filer is a separate linearly-scalable stateless server with customizable metadata stores, e.g., MySql, Postgres, Redis, Cassandra, HBase, Mongodb, Elastic Search, LevelDB, MemSql, TiDB, Etcd, CockroachDB, etc. [Back to TOC](#table-of-contents) @@ -365,7 +365,7 @@ The architectures are mostly the same. SeaweedFS aims to store and read files fa * SeaweedFS optimizes for small files, ensuring O(1) disk seek operation, and can also handle large files. * SeaweedFS statically assigns a volume id for a file. Locating file content becomes just a lookup of the volume id, which can be easily cached. -* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Cassandra, Mongodb, Redis, Elastic Search, MySql, Postgres, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized. +* SeaweedFS Filer metadata store can be any well-known and proven data stores, e.g., Redis, Cassandra, HBase, Mongodb, Elastic Search, MySql, Postgres, MemSql, TiDB, CockroachDB, Etcd etc, and is easy to customized. * SeaweedFS Volume server also communicates directly with clients via HTTP, supporting range queries, direct uploads, etc. | System | File Metadata | File Content Read| POSIX | REST API | Optimized for large number of small files | @@ -407,7 +407,7 @@ Ceph uses CRUSH hashing to automatically manage the data placement. SeaweedFS pl SeaweedFS is optimized for small files. Small files are stored as one continuous block of content, with at most 8 unused bytes between files. Small file access is O(1) disk read. -SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Mongodb, Redis, Elastic Search, Cassandra, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage. +SeaweedFS Filer uses off-the-shelf stores, such as MySql, Postgres, Mongodb, Redis, Elastic Search, Cassandra, HBase, MemSql, TiDB, CockroachCB, Etcd, to manage file directories. These stores are proven, scalable, and easier to manage. | SeaweedFS | comparable to Ceph | advantage | | ------------- | ------------- | ---------------- | diff --git a/docker/Makefile b/docker/Makefile index 8ab83ca18..c2e9a12e7 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -12,6 +12,9 @@ build: dev: build docker-compose -f local-dev-compose.yml -p seaweedfs up +k8s: build + docker-compose -f local-k8s-compose.yml -p seaweedfs up + dev_registry: build docker-compose -f local-registry-compose.yml -p seaweedfs up diff --git a/docker/local-k8s-compose.yml b/docker/local-k8s-compose.yml new file mode 100644 index 000000000..0dda89ca4 --- /dev/null +++ b/docker/local-k8s-compose.yml @@ -0,0 +1,65 @@ +version: '2' + +services: + master: + image: chrislusf/seaweedfs:local + ports: + - 9333:9333 + - 19333:19333 + command: "master -ip=master" + volume: + image: chrislusf/seaweedfs:local + ports: + - 8080:8080 + - 18080:18080 + command: "volume -mserver=master:9333 -port=8080 -ip=volume" + depends_on: + - master + mysql: + image: percona/percona-server:5.7 + ports: + - 3306:3306 + volumes: + - ./seaweedfs.sql:/docker-entrypoint-initdb.d/seaweedfs.sql + environment: + - MYSQL_ROOT_PASSWORD=secret + - MYSQL_DATABASE=seaweedfs + - MYSQL_PASSWORD=secret + - MYSQL_USER=seaweedfs + filer: + image: chrislusf/seaweedfs:local + ports: + - 8888:8888 + - 18888:18888 + environment: + - WEED_MYSQL_HOSTNAME=mysql + - WEED_MYSQL_PORT=3306 + - WEED_MYSQL_DATABASE=seaweedfs + - WEED_MYSQL_USERNAME=seaweedfs + - WEED_MYSQL_PASSWORD=secret + - WEED_MYSQL_ENABLED=true + - WEED_LEVELDB2_ENABLED=false + command: 'filer -master="master:9333"' + depends_on: + - master + - volume + - mysql + ingress: + image: jwilder/nginx-proxy + ports: + - "80:80" + volumes: + - /var/run/docker.sock:/tmp/docker.sock:ro + - /tmp/nginx:/etc/nginx/conf.d + s3: + image: chrislusf/seaweedfs:local + ports: + - 8333:8333 + command: 's3 -filer="filer:8888"' + depends_on: + - master + - volume + - filer + environment: + - VIRTUAL_HOST=s3 + - VIRTUAL_PORT=8333
\ No newline at end of file diff --git a/docker/seaweedfs.sql b/docker/seaweedfs.sql new file mode 100644 index 000000000..38ebc575c --- /dev/null +++ b/docker/seaweedfs.sql @@ -0,0 +1,12 @@ +CREATE DATABASE IF NOT EXISTS seaweedfs; +CREATE USER IF NOT EXISTS 'seaweedfs'@'%' IDENTIFIED BY 'secret'; +GRANT ALL PRIVILEGES ON seaweedfs_fast.* TO 'seaweedfs'@'%'; +FLUSH PRIVILEGES; +USE seaweedfs; +CREATE TABLE IF NOT EXISTS filemeta ( + dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field', + name VARCHAR(1000) COMMENT 'directory or file name', + directory TEXT COMMENT 'full path to parent directory', + meta LONGBLOB, + PRIMARY KEY (dirhash, name) +) DEFAULT CHARSET=utf8;
\ No newline at end of file @@ -68,6 +68,7 @@ require ( github.com/syndtr/goleveldb v1.0.0 github.com/tidwall/gjson v1.3.2 github.com/tidwall/match v1.0.1 + github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 github.com/valyala/bytebufferpool v1.0.0 github.com/viant/assertly v0.5.4 // indirect github.com/viant/ptrie v0.3.0 @@ -179,6 +179,8 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= +github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg= @@ -223,6 +225,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= @@ -539,6 +542,7 @@ github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhD github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ= github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -562,6 +566,8 @@ github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= +github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e h1:MRM5ITcdelLK2j1vwZ3Je0FKVCfqOLp5zO6trqMLYs0= github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDqSt+GTGFMVlhk3ULuV0y9ZmzeVGR4mloJI3M= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -621,6 +627,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365 h1:6iRwZdrFUzbcVYZwa8dXTIILGIxmmhjyUPJEcwzPGaU= +github.com/tsuna/gohbase v0.0.0-20201125011725-348991136365/go.mod h1:zj0GJHGvyf1ed3Jm/Tb4830c/ZKDq+YoLsCt2rGQuT0= github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -785,6 +793,7 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfru golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd h1:WgqgiQvkiZWz7XLhphjt2GI2GcGCTIZs9jqXMWmH+oc= golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -930,8 +939,14 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= +modernc.org/b v1.0.0 h1:vpvqeyp17ddcQWF29Czawql4lDdABCDRbXRAS4+aF2o= +modernc.org/b v1.0.0/go.mod h1:uZWcZfRj1BpYzfN9JTerzlNUnnPsV9O2ZA8JsRcubNg= +modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= +rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= +rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 7667d2815..7144b0016 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "2.16" + imageTag: "2.18" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/command/filer.go b/weed/command/filer.go index e72056893..146840e00 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -43,8 +43,6 @@ type FilerOptions struct { peers *string metricsHttpPort *int cacheToFilerLimit *int - - // default leveldb directory, used in "weed server" mode defaultLevelDbDirectory *string } @@ -67,6 +65,7 @@ func init() { f.peers = cmdFiler.Flag.String("peers", "", "all filers sharing the same filer store in comma separated ip:port list") f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.cacheToFilerLimit = cmdFiler.Flag.Int("cacheToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") + f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -75,6 +74,7 @@ func init() { filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file") filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file") filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file") + filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") } var cmdFiler = &Command{ @@ -92,6 +92,7 @@ var cmdFiler = &Command{ GET /path/to/ The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order. + If the "filer.toml" is not found, an embedded filer store will be craeted under "-defaultStoreDir". The example filer.toml configuration file can be generated by "weed scaffold -config=filer" @@ -127,10 +128,7 @@ func (fo *FilerOptions) startFiler() { publicVolumeMux = http.NewServeMux() } - defaultLevelDbDirectory := "./filerldb2" - if fo.defaultLevelDbDirectory != nil { - defaultLevelDbDirectory = util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") - } + defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") var peers []string if *fo.peers != "" { diff --git a/weed/command/s3.go b/weed/command/s3.go index ed5bb0b80..d8e3e306b 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -23,13 +23,14 @@ var ( ) type S3Options struct { - filer *string - port *int - config *string - domainName *string - tlsPrivateKey *string - tlsCertificate *string - metricsHttpPort *int + filer *string + port *int + config *string + domainName *string + tlsPrivateKey *string + tlsCertificate *string + metricsHttpPort *int + allowEmptyFolder *bool } func init() { @@ -41,6 +42,7 @@ func init() { s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file") s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file") s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") + s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders") } var cmdS3 = &Command{ @@ -181,6 +183,7 @@ func (s3opt *S3Options) startS3Server() bool { DomainName: *s3opt.domainName, BucketsPath: filerBucketsPath, GrpcDialOption: grpcDialOption, + AllowEmptyFolder: *s3opt.allowEmptyFolder, }) if s3ApiServer_err != nil { glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err) diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index 1ab763004..6cfd46427 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -138,12 +138,21 @@ hosts=[ ] username="" password="" +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] + +[hbase] +enabled = false +zkquorum = "" +table = "seaweedfs" [redis2] enabled = false address = "localhost:6379" password = "" database = 0 +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [redis_cluster2] enabled = false @@ -160,6 +169,8 @@ password = "" readOnly = true # automatically use the closest Redis server for reads routeByLatency = true +# This changes the data layout. Only add new directories. Removing/Updating will cause data loss. +superLargeDirectories = [] [etcd] enabled = false diff --git a/weed/command/server.go b/weed/command/server.go index 7e63f8e8a..bd25f94b1 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -113,6 +113,7 @@ func init() { s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file") + s3Options.allowEmptyFolder = cmdServer.Flag.Bool("s3.allowEmptyFolder", false, "allow empty folders") msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 81d105134..792a45ff4 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -34,7 +34,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by } _, err = res.RowsAffected() - if err != nil { + if err != nil { return fmt.Errorf("kv upsert no rows affected: %s", err) } return nil diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index ae8cb7a86..49f5625d9 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -16,8 +16,9 @@ func init() { } type CassandraStore struct { - cluster *gocql.ClusterConfig - session *gocql.Session + cluster *gocql.ClusterConfig + session *gocql.Session + superLargeDirectoryHash map[string]string } func (store *CassandraStore) GetName() string { @@ -30,10 +31,16 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) { +func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) { + dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) { store.cluster = gocql.NewCluster(hosts...) if username != "" && password != "" { store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} @@ -44,6 +51,19 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string, usernam if err != nil { glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) } + + // set directory hash + store.superLargeDirectoryHash = make(map[string]string) + existingHash := make(map[string]string) + for _, dir := range superLargeDirectories { + // adding dir hash to avoid duplicated names + dirHash := util.Md5String([]byte(dir))[:4] + store.superLargeDirectoryHash[dir] = dirHash + if existingDir, found := existingHash[dirHash]; found { + glog.Fatalf("directory %s has the same hash as %s", dir, existingDir) + } + existingHash[dirHash] = dir + } return } @@ -60,6 +80,10 @@ func (store *CassandraStore) RollbackTransaction(ctx context.Context) error { func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { dir, name := entry.FullPath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } + meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encode %s: %s", entry.FullPath, err) @@ -86,6 +110,10 @@ func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { dir, name := fullpath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } + var data []byte if err := store.session.Query( "SELECT meta FROM filemeta WHERE directory=? AND name=?", @@ -113,6 +141,9 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { dir, name := fullpath.DirAndName() + if dirHash, ok := store.isSuperLargeDirectory(dir); ok { + dir, name = dirHash+name, "" + } if err := store.session.Query( "DELETE FROM filemeta WHERE directory=? AND name=?", @@ -124,6 +155,9 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.Full } func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { + if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { + return nil // filer.ErrUnsupportedSuperLargeDirectoryListing + } if err := store.session.Query( "DELETE FROM filemeta WHERE directory=?", @@ -141,6 +175,10 @@ func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, f func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok { + return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing + } + cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?" if inclusive { cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?" diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go index 5f5dfed25..a6f18709e 100644 --- a/weed/filer/configuration.go +++ b/weed/filer/configuration.go @@ -25,7 +25,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) { glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err) } f.SetStore(store) - glog.V(0).Infof("configured filer for %s", store.GetName()) + glog.V(0).Infof("configured filer store to %s", store.GetName()) hasDefaultStoreConfigured = true break } @@ -59,12 +59,15 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) { parts := strings.Split(key, ".") storeName, storeId := parts[0], parts[1] - store := storeNames[storeName] + store, found := storeNames[storeName] + if !found { + continue + } store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(FilerStore) if err := store.Initialize(config, key+"."); err != nil { glog.Fatalf("Failed to initialize store for %s: %+v", key, err) } - location := config.GetString(key+".location") + location := config.GetString(key + ".location") if location == "" { glog.Errorf("path-specific filer store needs %s", key+".location") os.Exit(-1) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 800dd35dc..920d79da5 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } - dirParts := strings.Split(string(entry.FullPath), "/") - - // fmt.Printf("directory parts: %+v\n", dirParts) - - var lastDirectoryEntry *Entry - - for i := 1; i < len(dirParts); i++ { - dirPath := "/" + util.Join(dirParts[:i]...) - // fmt.Printf("%d directory: %+v\n", i, dirPath) - - // check the store directly - glog.V(4).Infof("find uncached directory: %s", dirPath) - dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) - - // no such existing directory - if dirEntry == nil { - - // create the directory - now := time.Now() - - dirEntry = &Entry{ - FullPath: util.FullPath(dirPath), - Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | entry.Mode | 0110, - Uid: entry.Uid, - Gid: entry.Gid, - Collection: entry.Collection, - Replication: entry.Replication, - UserName: entry.UserName, - GroupNames: entry.GroupNames, - }, - } - - glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) - mkdirErr := f.Store.InsertEntry(ctx, dirEntry) - if mkdirErr != nil { - if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { - glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) - return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) - } - } else { - f.maybeAddBucket(dirEntry) - f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) - } - - } else if !dirEntry.IsDirectory() { - glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) - return fmt.Errorf("%s is a file", dirPath) - } - - // remember the direct parent directory entry - if i == len(dirParts)-1 { - lastDirectoryEntry = dirEntry - } - - } - - if lastDirectoryEntry == nil { - glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath) - return fmt.Errorf("parent folder not found: %v", entry.FullPath) - } + oldEntry, _ := f.FindEntry(ctx, entry.FullPath) /* if !hasWritePermission(lastDirectoryEntry, entry) { @@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } */ - oldEntry, _ := f.FindEntry(ctx, entry.FullPath) - if oldEntry == nil { + + dirParts := strings.Split(string(entry.FullPath), "/") + if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } + glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name()) if err := f.Store.InsertEntry(ctx, entry); err != nil { glog.Errorf("insert entry %s: %v", entry.FullPath, err) @@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr return nil } +func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) { + + if level == 0 { + return nil + } + + dirPath := "/" + util.Join(dirParts[:level]...) + // fmt.Printf("%d directory: %+v\n", i, dirPath) + + // check the store directly + glog.V(4).Infof("find uncached directory: %s", dirPath) + dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath)) + + // no such existing directory + if dirEntry == nil { + + // ensure parent directory + if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil { + return err + } + + // create the directory + now := time.Now() + + dirEntry = &Entry{ + FullPath: util.FullPath(dirPath), + Attr: Attr{ + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0110, + Uid: entry.Uid, + Gid: entry.Gid, + Collection: entry.Collection, + Replication: entry.Replication, + UserName: entry.UserName, + GroupNames: entry.GroupNames, + }, + } + + glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode) + mkdirErr := f.Store.InsertEntry(ctx, dirEntry) + if mkdirErr != nil { + if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound { + glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr) + return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) + } + } else { + f.maybeAddBucket(dirEntry) + f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) + } + + } else if !dirEntry.IsDirectory() { + glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath) + return fmt.Errorf("%s is a file", dirPath) + } + + return nil +} + func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) { if oldEntry != nil { entry.Attr.Crtime = oldEntry.Attr.Crtime @@ -280,23 +281,6 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } -func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) ([]*Entry, error) { - if strings.HasSuffix(string(p), "/") && len(p) > 1 { - p = p[0 : len(p)-1] - } - - var makeupEntries []*Entry - entries, expiredCount, lastFileName, err := f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix) - for expiredCount > 0 && err == nil { - makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix) - if err == nil { - entries = append(entries, makeupEntries...) - } - } - - return entries, err -} - func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) { listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix) if listErr != nil { diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index 09af80b42..9eee38277 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -151,4 +151,3 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { } f.DeleteChunks(toDelete) } - diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go new file mode 100644 index 000000000..b26959cb0 --- /dev/null +++ b/weed/filer/filer_search.go @@ -0,0 +1,76 @@ +package filer + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/util" + "path/filepath" + "strings" +) + +func splitPattern(pattern string) (prefix string, restPattern string) { + position := strings.Index(pattern, "*") + if position >= 0 { + return pattern[:position], pattern[position:] + } + position = strings.Index(pattern, "?") + if position >= 0 { + return pattern[:position], pattern[position:] + } + return "", restPattern +} + +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) { + if strings.HasSuffix(string(p), "/") && len(p) > 1 { + p = p[0 : len(p)-1] + } + + prefix, restNamePattern := splitPattern(namePattern) + var missedCount int + var lastFileName string + + entries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern) + + for missedCount > 0 && err == nil { + var makeupEntries []*Entry + makeupEntries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern) + for _, entry := range makeupEntries { + entries = append(entries, entry) + } + } + + return entries, err +} + +func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix, restNamePattern string) (matchedEntries []*Entry, missedCount int, lastFileName string, err error) { + var foundEntries []*Entry + + foundEntries, lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix) + if err != nil { + return + } + if len(restNamePattern) == 0 { + return foundEntries, 0, lastFileName, nil + } + for _, entry := range foundEntries { + nameToTest := strings.ToLower(entry.Name()) + if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched { + matchedEntries = append(matchedEntries, entry) + } else { + missedCount++ + } + } + return +} + +func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, lastFileName string, err error) { + var makeupEntries []*Entry + var expiredCount int + entries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix) + for expiredCount > 0 && err == nil { + makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix) + if err == nil { + entries = append(entries, makeupEntries...) + } + } + return +} diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 10aee75df..f1e6c6c35 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,22 +3,14 @@ package filer import ( "context" "errors" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/viant/ptrie" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) var ( - ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") - ErrKvNotImplemented = errors.New("kv not implemented yet") - ErrKvNotFound = errors.New("kv: not found") - - _ = VirtualFilerStore(&FilerStoreWrapper{}) + ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing") + ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing") + ErrKvNotImplemented = errors.New("kv not implemented yet") + ErrKvNotFound = errors.New("kv: not found") ) type FilerStore interface { @@ -45,281 +37,3 @@ type FilerStore interface { Shutdown() } - -type VirtualFilerStore interface { - FilerStore - DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error - DeleteOneEntry(ctx context.Context, entry *Entry) error - AddPathSpecificStore(path string, storeId string, store FilerStore) -} - -type FilerStoreWrapper struct { - defaultStore FilerStore - pathToStore ptrie.Trie - storeIdToStore map[string]FilerStore -} - -func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { - if innerStore, ok := store.(*FilerStoreWrapper); ok { - return innerStore - } - return &FilerStoreWrapper{ - defaultStore: store, - pathToStore: ptrie.New(), - storeIdToStore: make(map[string]FilerStore), - } -} - -func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { - fsw.storeIdToStore[storeId] = store - err := fsw.pathToStore.Put([]byte(path), storeId) - if err != nil { - glog.Fatalf("put path specific store: %v", err) - } -} - -func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) { - store = fsw.defaultStore - if path == "" { - return - } - var storeId string - fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { - storeId = value.(string) - return false - }) - if storeId != "" { - store = fsw.storeIdToStore[storeId] - } - return -} - -func (fsw *FilerStoreWrapper) GetName() string { - return fsw.getActualStore("").GetName() -} - -func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { - return fsw.getActualStore("").Initialize(configuration, prefix) -} - -func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { - actualStore := fsw.getActualStore(entry.FullPath) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - if entry.Mime == "application/octet-stream" { - entry.Mime = "" - } - - if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { - return err - } - - glog.V(4).Infof("InsertEntry %s", entry.FullPath) - return actualStore.InsertEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { - actualStore := fsw.getActualStore(entry.FullPath) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds()) - }() - - filer_pb.BeforeEntrySerialization(entry.Chunks) - if entry.Mime == "application/octet-stream" { - entry.Mime = "" - } - - if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { - return err - } - - glog.V(4).Infof("UpdateEntry %s", entry.FullPath) - return actualStore.UpdateEntry(ctx, entry) -} - -func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { - actualStore := fsw.getActualStore(fp) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("FindEntry %s", fp) - entry, err = actualStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - - fsw.maybeReadHardLink(ctx, entry) - - filer_pb.AfterEntryDeserialization(entry.Chunks) - return -} - -func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - actualStore := fsw.getActualStore(fp) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - existingEntry, findErr := fsw.FindEntry(ctx, fp) - if findErr == filer_pb.ErrNotFound { - return nil - } - if len(existingEntry.HardLinkId) != 0 { - // remove hard link - glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) - if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { - return err - } - } - - glog.V(4).Infof("DeleteEntry %s", fp) - return actualStore.DeleteEntry(ctx, fp) -} - -func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { - actualStore := fsw.getActualStore(existingEntry.FullPath) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) - }() - - if len(existingEntry.HardLinkId) != 0 { - // remove hard link - glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) - if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { - return err - } - } - - glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) - return actualStore.DeleteEntry(ctx, existingEntry.FullPath) -} - -func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { - actualStore := fsw.getActualStore(fp) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("DeleteFolderChildren %s", fp) - return actualStore.DeleteFolderChildren(ctx, fp) -} - -func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { - actualStore := fsw.getActualStore(dirPath) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) - }() - - glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) - entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - for _, entry := range entries { - fsw.maybeReadHardLink(ctx, entry) - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, err -} - -func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { - actualStore := fsw.getActualStore(dirPath) - stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc() - start := time.Now() - defer func() { - stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) - }() - glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) - entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - if err == ErrUnsupportedListDirectoryPrefixed { - entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) - } - if err != nil { - return nil, err - } - for _, entry := range entries { - fsw.maybeReadHardLink(ctx, entry) - filer_pb.AfterEntryDeserialization(entry.Chunks) - } - return entries, nil -} - -func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) { - actualStore := fsw.getActualStore(dirPath) - entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) - if err != nil { - return nil, err - } - - if prefix == "" { - return - } - - count := 0 - var lastFileName string - notPrefixed := entries - entries = nil - for count < limit && len(notPrefixed) > 0 { - for _, entry := range notPrefixed { - lastFileName = entry.Name() - if strings.HasPrefix(entry.Name(), prefix) { - count++ - entries = append(entries, entry) - if count >= limit { - break - } - } - } - if count < limit { - notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) - if err != nil { - return - } - } - } - return -} - -func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { - return fsw.getActualStore("").BeginTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { - return fsw.getActualStore("").CommitTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { - return fsw.getActualStore("").RollbackTransaction(ctx) -} - -func (fsw *FilerStoreWrapper) Shutdown() { - fsw.getActualStore("").Shutdown() -} - -func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - return fsw.getActualStore("").KvPut(ctx, key, value) -} -func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - return fsw.getActualStore("").KvGet(ctx, key) -} -func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { - return fsw.getActualStore("").KvDelete(ctx, key) -} diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go index 144b7286e..316c76a0c 100644 --- a/weed/filer/filerstore_hardlink.go +++ b/weed/filer/filerstore_hardlink.go @@ -19,7 +19,8 @@ func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry // check what is existing entry glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath) - existingEntry, err := fsw.getActualStore(entry.FullPath).FindEntry(ctx, entry.FullPath) + actualStore := fsw.getActualStore(entry.FullPath) + existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath) if err != nil && err != filer_pb.ErrNotFound { return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) } diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go new file mode 100644 index 000000000..ea0f9db77 --- /dev/null +++ b/weed/filer/filerstore_translate_path.go @@ -0,0 +1,161 @@ +package filer + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/util" + "strings" +) + +var ( + _ = FilerStore(&FilerStorePathTranlator{}) +) + +type FilerStorePathTranlator struct { + actualStore FilerStore + storeRoot string +} + +func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator { + if innerStore, ok := store.(*FilerStorePathTranlator); ok { + return innerStore + } + + if !strings.HasSuffix(storeRoot, "/") { + storeRoot += "/" + } + + return &FilerStorePathTranlator{ + actualStore: store, + storeRoot: storeRoot, + } +} + +func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) { + newPath = fp + if t.storeRoot == "/" { + return + } + newPath = fp[len(t.storeRoot)-1:] + if newPath == "" { + newPath = "/" + } + return +} +func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) { + previousPath = entry.FullPath + if t.storeRoot == "/" { + return + } + entry.FullPath = t.translatePath(previousPath) + return +} +func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) { + entry.FullPath = previousPath +} + +func (t *FilerStorePathTranlator) GetName() string { + return t.actualStore.GetName() +} + +func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error { + return t.actualStore.Initialize(configuration, prefix) +} + +func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error { + previousPath := t.changeEntryPath(entry) + defer t.recoverEntryPath(entry, previousPath) + + return t.actualStore.InsertEntry(ctx, entry) +} + +func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error { + previousPath := t.changeEntryPath(entry) + defer t.recoverEntryPath(entry, previousPath) + + return t.actualStore.UpdateEntry(ctx, entry) +} + +func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { + if t.storeRoot == "/" { + return t.actualStore.FindEntry(ctx, fp) + } + newFullPath := t.translatePath(fp) + entry, err = t.actualStore.FindEntry(ctx, newFullPath) + if err == nil { + entry.FullPath = fp[:len(t.storeRoot)-1] + entry.FullPath + } + return +} + +func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + newFullPath := t.translatePath(fp) + return t.actualStore.DeleteEntry(ctx, newFullPath) +} + +func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { + + previousPath := t.changeEntryPath(existingEntry) + defer t.recoverEntryPath(existingEntry, previousPath) + + return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath) +} + +func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + newFullPath := t.translatePath(fp) + + return t.actualStore.DeleteFolderChildren(ctx, newFullPath) +} + +func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { + + newFullPath := t.translatePath(dirPath) + + entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath + } + return entries, err +} + +func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { + + newFullPath := t.translatePath(dirPath) + + entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix) + if err != nil { + return nil, err + } + for _, entry := range entries { + entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath + } + return entries, nil +} + +func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) { + return t.actualStore.BeginTransaction(ctx) +} + +func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error { + return t.actualStore.CommitTransaction(ctx) +} + +func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error { + return t.actualStore.RollbackTransaction(ctx) +} + +func (t *FilerStorePathTranlator) Shutdown() { + t.actualStore.Shutdown() +} + +func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return t.actualStore.KvPut(ctx, key, value) +} +func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return t.actualStore.KvGet(ctx, key) +} +func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) { + return t.actualStore.KvDelete(ctx, key) +} diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go new file mode 100644 index 000000000..3206d5ba4 --- /dev/null +++ b/weed/filer/filerstore_wrapper.go @@ -0,0 +1,299 @@ +package filer + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/viant/ptrie" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + _ = VirtualFilerStore(&FilerStoreWrapper{}) +) + +type VirtualFilerStore interface { + FilerStore + DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error + DeleteOneEntry(ctx context.Context, entry *Entry) error + AddPathSpecificStore(path string, storeId string, store FilerStore) +} + +type FilerStoreWrapper struct { + defaultStore FilerStore + pathToStore ptrie.Trie + storeIdToStore map[string]FilerStore +} + +func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { + if innerStore, ok := store.(*FilerStoreWrapper); ok { + return innerStore + } + return &FilerStoreWrapper{ + defaultStore: store, + pathToStore: ptrie.New(), + storeIdToStore: make(map[string]FilerStore), + } +} + +func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { + fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store) + err := fsw.pathToStore.Put([]byte(path), storeId) + if err != nil { + glog.Fatalf("put path specific store: %v", err) + } +} + +func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) { + store = fsw.defaultStore + if path == "/" { + return + } + var storeId string + fsw.pathToStore.MatchPrefix([]byte(path), func(key []byte, value interface{}) bool { + storeId = value.(string) + return false + }) + if storeId != "" { + store = fsw.storeIdToStore[storeId] + } + return +} + +func (fsw *FilerStoreWrapper) getDefaultStore() (store FilerStore) { + return fsw.defaultStore +} + +func (fsw *FilerStoreWrapper) GetName() string { + return fsw.getDefaultStore().GetName() +} + +func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error { + return fsw.getDefaultStore().Initialize(configuration, prefix) +} + +func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error { + actualStore := fsw.getActualStore(entry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "insert").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "insert").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } + + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err + } + + glog.V(4).Infof("InsertEntry %s", entry.FullPath) + return actualStore.InsertEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error { + actualStore := fsw.getActualStore(entry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "update").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "update").Observe(time.Since(start).Seconds()) + }() + + filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } + + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err + } + + glog.V(4).Infof("UpdateEntry %s", entry.FullPath) + return actualStore.UpdateEntry(ctx, entry) +} + +func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) { + actualStore := fsw.getActualStore(fp) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "find").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("FindEntry %s", fp) + entry, err = actualStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + + fsw.maybeReadHardLink(ctx, entry) + + filer_pb.AfterEntryDeserialization(entry.Chunks) + return +} + +func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + actualStore := fsw.getActualStore(fp) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + existingEntry, findErr := fsw.FindEntry(ctx, fp) + if findErr == filer_pb.ErrNotFound { + return nil + } + if len(existingEntry.HardLinkId) != 0 { + // remove hard link + glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + + glog.V(4).Infof("DeleteEntry %s", fp) + return actualStore.DeleteEntry(ctx, fp) +} + +func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) { + actualStore := fsw.getActualStore(existingEntry.FullPath) + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) + }() + + if len(existingEntry.HardLinkId) != 0 { + // remove hard link + glog.V(4).Infof("DeleteHardLink %s", existingEntry.FullPath) + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + + glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath) + return actualStore.DeleteEntry(ctx, existingEntry.FullPath) +} + +func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + actualStore := fsw.getActualStore(fp + "/") + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("DeleteFolderChildren %s", fp) + return actualStore.DeleteFolderChildren(ctx, fp) +} + +func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) { + actualStore := fsw.getActualStore(dirPath + "/") + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds()) + }() + + glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit) + entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, err +} + +func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) { + actualStore := fsw.getActualStore(dirPath + "/") + stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc() + start := time.Now() + defer func() { + stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds()) + }() + glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit) + entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + if err == ErrUnsupportedListDirectoryPrefixed { + entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix) + } + if err != nil { + return nil, err + } + for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, nil +} + +func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) { + actualStore := fsw.getActualStore(dirPath + "/") + entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + + if prefix == "" { + return + } + + count := 0 + var lastFileName string + notPrefixed := entries + entries = nil + for count < limit && len(notPrefixed) > 0 { + for _, entry := range notPrefixed { + lastFileName = entry.Name() + if strings.HasPrefix(entry.Name(), prefix) { + count++ + entries = append(entries, entry) + if count >= limit { + break + } + } + } + if count < limit { + notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit) + if err != nil { + return + } + } + } + return +} + +func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) { + return fsw.getDefaultStore().BeginTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error { + return fsw.getDefaultStore().CommitTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error { + return fsw.getDefaultStore().RollbackTransaction(ctx) +} + +func (fsw *FilerStoreWrapper) Shutdown() { + fsw.getDefaultStore().Shutdown() +} + +func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return fsw.getDefaultStore().KvPut(ctx, key, value) +} +func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return fsw.getDefaultStore().KvGet(ctx, key) +} +func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { + return fsw.getDefaultStore().KvDelete(ctx, key) +} diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go new file mode 100644 index 000000000..6b0ad58b9 --- /dev/null +++ b/weed/filer/hbase/hbase_store.go @@ -0,0 +1,227 @@ +package hbase + +import ( + "bytes" + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/tsuna/gohbase" + "github.com/tsuna/gohbase/hrpc" + "io" +) + +func init() { + filer.Stores = append(filer.Stores, &HbaseStore{}) +} + +type HbaseStore struct { + Client gohbase.Client + table []byte + cfKv string + cfMetaDir string + column string +} + +func (store *HbaseStore) GetName() string { + return "hbase" +} + +func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize( + configuration.GetString(prefix+"zkquorum"), + configuration.GetString(prefix+"table"), + ) +} + +func (store *HbaseStore) initialize(zkquorum, table string) (err error) { + store.Client = gohbase.NewClient(zkquorum) + store.table = []byte(table) + store.cfKv = "kv" + store.cfMetaDir = "meta" + store.column = "a" + + // check table exists + key := "whatever" + headers := map[string][]string{store.cfMetaDir: nil} + get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers)) + if err != nil { + return fmt.Errorf("NewGet returned an error: %v", err) + } + _, err = store.Client.Get(get) + if err != gohbase.TableNotFound { + return nil + } + + // create table + adminClient := gohbase.NewAdminClient(zkquorum) + cFamilies := []string{store.cfKv, store.cfMetaDir} + cf := make(map[string]map[string]string, len(cFamilies)) + for _, f := range cFamilies { + cf[f] = nil + } + ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf) + if err := adminClient.CreateTable(ct); err != nil { + return err + } + + return nil +} + +func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + + return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec) +} + +func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) { + value, err := store.doGet(ctx, store.cfMetaDir, []byte(path)) + if err != nil { + if err == filer.ErrKvNotFound { + return nil, filer_pb.ErrNotFound + } + return nil, err + } + + entry = &filer.Entry{ + FullPath: path, + } + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil +} + +func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) { + return store.doDelete(ctx, store.cfMetaDir, []byte(path)) +} + +func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) { + + family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} + expectedPrefix := []byte(path.Child("")) + scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) + if err != nil { + return err + } + + scanner := store.Client.Scan(scan) + defer scanner.Close() + for { + res, err := scanner.Next() + if err != nil { + break + } + if len(res.Cells) == 0 { + continue + } + cell := res.Cells[0] + + if !bytes.HasPrefix(cell.Row, expectedPrefix) { + break + } + fullpath := util.FullPath(cell.Row) + dir, _ := fullpath.DirAndName() + if dir != string(path) { + continue + } + + err = store.doDelete(ctx, store.cfMetaDir, cell.Row) + if err != nil { + break + } + + } + return +} + +func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "") +} + +func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) { + family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}} + expectedPrefix := []byte(dirPath.Child(prefix)) + scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family)) + if err != nil { + return nil, err + } + + var entries []*filer.Entry + scanner := store.Client.Scan(scan) + defer scanner.Close() + for { + res, err := scanner.Next() + if err == io.EOF { + break + } + if err != nil { + return entries, err + } + if len(res.Cells) == 0 { + continue + } + cell := res.Cells[0] + + if !bytes.HasPrefix(cell.Row, expectedPrefix) { + break + } + + fullpath := util.FullPath(cell.Row) + dir, fileName := fullpath.DirAndName() + if dir != string(dirPath) { + continue + } + + value := cell.Value + + if fileName == startFileName && !includeStartFile { + continue + } + + limit-- + if limit < 0 { + break + } + entry := &filer.Entry{ + FullPath: fullpath, + } + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + entries = append(entries, entry) + } + + return entries, nil +} + +func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *HbaseStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *HbaseStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *HbaseStore) Shutdown() { + store.Client.Close() +} diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go new file mode 100644 index 000000000..26bf763e2 --- /dev/null +++ b/weed/filer/hbase/hbase_store_kv.go @@ -0,0 +1,75 @@ +package hbase + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/tsuna/gohbase/hrpc" + "time" +) + +const( + COLUMN_NAME = "a" +) +func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + return store.doPut(ctx, store.cfKv, key, value, 0) +} + +func (store *HbaseStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + return store.doGet(ctx, store.cfKv, key) +} + +func (store *HbaseStore) KvDelete(ctx context.Context, key []byte) (err error) { + return store.doDelete(ctx, store.cfKv, key) +} + +func (store *HbaseStore) doPut(ctx context.Context, cf string, key, value []byte, ttlSecond int32) (err error) { + if ttlSecond > 0 { + return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal), hrpc.TTL(time.Duration(ttlSecond)*time.Second)) + } + return store.doPutWithOptions(ctx, cf, key, value, hrpc.Durability(hrpc.AsyncWal)) +} + +func (store *HbaseStore) doPutWithOptions(ctx context.Context, cf string, key, value []byte, options ...func(hrpc.Call) error) (err error) { + values := map[string]map[string][]byte{cf: map[string][]byte{}} + values[cf][COLUMN_NAME] = value + putRequest, err := hrpc.NewPut(ctx, store.table, key, values, options...) + if err != nil { + return err + } + _, err = store.Client.Put(putRequest) + if err != nil { + return err + } + return nil +} + +func (store *HbaseStore) doGet(ctx context.Context, cf string, key []byte) (value []byte, err error) { + family := map[string][]string{cf: {COLUMN_NAME}} + getRequest, err := hrpc.NewGet(context.Background(), store.table, key, hrpc.Families(family)) + if err != nil { + return nil, err + } + getResp, err := store.Client.Get(getRequest) + if err != nil { + return nil, err + } + if len(getResp.Cells) == 0 { + return nil, filer.ErrKvNotFound + } + + return getResp.Cells[0].Value, nil +} + +func (store *HbaseStore) doDelete(ctx context.Context, cf string, key []byte) (err error) { + values := map[string]map[string][]byte{cf: map[string][]byte{}} + values[cf][COLUMN_NAME] = nil + deleteRequest, err := hrpc.NewDel(ctx, store.table, key, values, hrpc.Durability(hrpc.AsyncWal)) + if err != nil { + return err + } + _, err = store.Client.Delete(deleteRequest) + if err != nil { + return err + } + return nil +} diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 548473116..1f78057ef 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -82,8 +82,8 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte if err == filer_pb.ErrNotFound { err = filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ - Directory: dir, - Entry: &filer_pb.Entry{ + Directory: dir, + Entry: &filer_pb.Entry{ Name: name, IsDirectory: false, Attributes: &filer_pb.FuseAttributes{ @@ -92,7 +92,7 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte FileMode: uint32(0644), Collection: "", Replication: "", - FileSize: uint64(len(content)), + FileSize: uint64(len(content)), }, Content: content, }, @@ -103,10 +103,10 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte entry.Attributes.Mtime = time.Now().Unix() entry.Attributes.FileSize = uint64(len(content)) err = filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: entry, + Directory: dir, + Entry: entry, }) } return err -}
\ No newline at end of file +} diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go index d155dbe88..c7742bb19 100644 --- a/weed/filer/redis2/redis_cluster_store.go +++ b/weed/filer/redis2/redis_cluster_store.go @@ -28,15 +28,17 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr configuration.GetString(prefix+"password"), configuration.GetBool(prefix+"useReadOnly"), configuration.GetBool(prefix+"routeByLatency"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) { +func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: addresses, Password: password, ReadOnly: readOnly, RouteByLatency: routeByLatency, }) + store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go index ed04c817b..da404ed4c 100644 --- a/weed/filer/redis2/redis_store.go +++ b/weed/filer/redis2/redis_store.go @@ -23,14 +23,16 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st configuration.GetString(prefix+"address"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *Redis2Store) initialize(hostPort string, password string, database int) (err error) { +func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) { store.Client = redis.NewClient(&redis.Options{ Addr: hostPort, Password: password, DB: database, }) + store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 0374314c0..00d02ea14 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -18,7 +18,21 @@ const ( ) type UniversalRedis2Store struct { - Client redis.UniversalClient + Client redis.UniversalClient + superLargeDirectoryHash map[string]bool +} + +func (store *UniversalRedis2Store) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) { + _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir] + return +} + +func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectories []string) { + // set directory hash + store.superLargeDirectoryHash = make(map[string]bool) + for _, dir := range superLargeDirectories { + store.superLargeDirectoryHash[dir] = true + } } func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { @@ -47,6 +61,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer } dir, name := entry.FullPath.DirAndName() + if store.isSuperLargeDirectory(dir) { + return nil + } + if name != "" { if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) @@ -96,6 +114,9 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti } dir, name := fullpath.DirAndName() + if store.isSuperLargeDirectory(dir) { + return nil + } if name != "" { _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() if err != nil { @@ -108,6 +129,10 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + if store.isSuperLargeDirectory(string(fullpath)) { + return nil + } + members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() if err != nil { return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index da0a38dbf..b8af6381a 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -3,6 +3,7 @@ package s3api import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "io/ioutil" "net/http" @@ -79,7 +80,6 @@ func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []b return nil } - func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3ApiConfiguration) error { var identities []*Identity for _, ident := range config.Identities { @@ -186,7 +186,6 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) return identity, s3err.ErrNotImplemented } - glog.V(3).Infof("auth error: %v", s3Err) if s3Err != s3err.ErrNone { return identity, s3Err } @@ -203,6 +202,44 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) } +func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err.ErrorCode) { + var identity *Identity + var s3Err s3err.ErrorCode + var found bool + switch getRequestAuthType(r) { + case authTypeStreamingSigned: + return identity, s3err.ErrNone + case authTypeUnknown: + glog.V(3).Infof("unknown auth type") + return identity, s3err.ErrAccessDenied + case authTypePresignedV2, authTypeSignedV2: + glog.V(3).Infof("v2 auth type") + identity, s3Err = iam.isReqAuthenticatedV2(r) + case authTypeSigned, authTypePresigned: + glog.V(3).Infof("v4 auth type") + identity, s3Err = iam.reqSignatureV4Verify(r) + case authTypePostPolicy: + glog.V(3).Infof("post policy auth type") + return identity, s3err.ErrNone + case authTypeJWT: + glog.V(3).Infof("jwt auth type") + return identity, s3err.ErrNotImplemented + case authTypeAnonymous: + identity, found = iam.lookupAnonymous() + if !found { + return identity, s3err.ErrAccessDenied + } + default: + return identity, s3err.ErrNotImplemented + } + + glog.V(3).Infof("auth error: %v", s3Err) + if s3Err != s3err.ErrNone { + return identity, s3Err + } + return identity, s3err.ErrNone +} + func (identity *Identity) canDo(action Action, bucket string) bool { if identity.isAdmin() { return true @@ -216,10 +253,14 @@ func (identity *Identity) canDo(action Action, bucket string) bool { return false } limitedByBucket := string(action) + ":" + bucket + adminLimitedByBucket := s3_constants.ACTION_ADMIN + ":" + bucket for _, a := range identity.Actions { if string(a) == limitedByBucket { return true } + if string(a) == adminLimitedByBucket { + return true + } } return false } diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go index a960d2370..6614b0af0 100644 --- a/weed/s3api/http/header.go +++ b/weed/s3api/http/header.go @@ -31,6 +31,6 @@ const ( // Non-Standard S3 HTTP request constants const ( - AmzIdentityId = "x-amz-identity-id" - AmzIsAdmin = "x-amz-is-admin" // only set to http request header as a context + AmzIdentityId = "s3-identity-id" + AmzIsAdmin = "s3-is-admin" // only set to http request header as a context ) diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go index f6056ef78..4e484ac98 100644 --- a/weed/s3api/s3_constants/s3_actions.go +++ b/weed/s3api/s3_constants/s3_actions.go @@ -7,4 +7,3 @@ const ( ACTION_TAGGING = "Tagging" ACTION_LIST = "List" ) - diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 00b7382cc..f750f6e53 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -4,6 +4,7 @@ import ( "context" "encoding/xml" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "math" "net/http" "time" @@ -26,6 +27,16 @@ type ListAllMyBucketsResult struct { func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) { + var identity *Identity + var s3Err s3err.ErrorCode + if s3a.iam.isEnabled() { + identity, s3Err = s3a.iam.authUser(r) + if s3Err != s3err.ErrNone { + writeErrorResponse(w, s3Err, r.URL) + return + } + } + var response ListAllMyBucketsResult entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32) @@ -40,7 +51,7 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques var buckets []*s3.Bucket for _, entry := range entries { if entry.IsDirectory { - if !s3a.hasAccess(r, entry) { + if identity!=nil && !identity.canDo(s3_constants.ACTION_ADMIN, entry.Name) { continue } buckets = append(buckets, &s3.Bucket{ diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index dce2fd6b0..fb1497b78 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -264,8 +264,10 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } } else { var isEmpty bool - if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil { - return + if !s3a.option.AllowEmptyFolder { + if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil { + glog.Errorf("check empty folder %s: %v", dir, err) + } } if !isEmpty { eachEntryFn(dir, entry) @@ -310,13 +312,17 @@ func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerClient, parentDir, name string) (isEmpty bool, err error) { // println("+ isDirectoryAllEmpty", dir, name) + glog.V(4).Infof("+ isEmpty %s/%s", parentDir, name) + defer glog.V(4).Infof("- isEmpty %s/%s %v", parentDir, name, isEmpty) var fileCounter int var subDirs []string currentDir := parentDir + "/" + name var startFrom string var isExhausted bool - for fileCounter == 0 && !isExhausted { + var foundEntry bool + for fileCounter == 0 && !isExhausted && err == nil { err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error { + foundEntry = true if entry.IsDirectory { subDirs = append(subDirs, entry.Name) } else { @@ -324,8 +330,12 @@ func (s3a *S3ApiServer) isDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerCli } startFrom = entry.Name isExhausted = isExhausted || isLast + glog.V(4).Infof(" * %s/%s isLast: %t", currentDir, startFrom, isLast) return nil }, startFrom, false, 8) + if !foundEntry { + break + } } if err != nil { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 93e2bb575..4993104ae 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -20,6 +20,7 @@ type S3ApiServerOption struct { DomainName string BucketsPath string GrpcDialOption grpc.DialOption + AllowEmptyFolder bool } type S3ApiServer struct { @@ -128,7 +129,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { } // ListBuckets - apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN), "LIST")) + apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.ListBucketsHandler, "LIST")) // NotFound apiRouter.NotFoundHandler = http.HandlerFunc(notFoundHandler) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 5f1b2d819..38472df2a 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -326,7 +326,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} - if err != nil { + if err != nil && err != filer_pb.ErrNotFound { resp.Error = err.Error() } return resp, nil diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index d04053df5..2da129ab2 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -23,6 +23,7 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" @@ -109,6 +110,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) os.MkdirAll(option.DefaultLevelDbDir, 0755) } glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) + } else { + glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir) } util.LoadConfiguration("notification", false) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index d55bf7cbb..4d61193ec 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -6,6 +6,7 @@ import ( "io" "mime" "net/http" + "net/url" "path/filepath" "strconv" "strings" @@ -121,6 +122,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, setEtag(w, etag) filename := entry.Name() + filename = url.QueryEscape(filename) adjustHeaderContentDisposition(w, r, filename) totalSize := int64(entry.Size()) diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 99345550c..f303ba1d4 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -34,8 +34,9 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque } lastFileName := r.FormValue("lastFileName") + namePattern := r.FormValue("namePattern") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "") + entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 82c6c11b4..eee39152b 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -209,17 +209,36 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque for { limitedReader := io.LimitReader(partReader, int64(chunkSize)) - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil + data, err := ioutil.ReadAll(limitedReader) + if err != nil { + return nil, nil, 0, err, nil } + dataReader := util.NewBytesReader(data) + + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return nil, nil, 0, assignErr, nil + } - // upload the chunk to the volume server - uploadResult, uploadErr, data := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + time.Sleep(251 * time.Millisecond) + continue + } + break + } if uploadErr != nil { return nil, nil, 0, uploadErr, nil } + content = data // if last chunk exhausted the reader exactly at the border diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 497ef4f9e..06ae15c9e 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -86,7 +86,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io // check collection if *collection != "" && strings.HasPrefix(*locationPrefix, "/buckets/") { - return fmt.Errorf("one s3 bucket goes to one collection and not customizable.") + return fmt.Errorf("one s3 bucket goes to one collection and not customizable") } // check replication diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_s3_bucket_create.go index 52d96e4c3..a512ffc4a 100644 --- a/weed/shell/command_bucket_create.go +++ b/weed/shell/command_s3_bucket_create.go @@ -12,29 +12,31 @@ import ( ) func init() { - Commands = append(Commands, &commandBucketCreate{}) + Commands = append(Commands, &commandS3BucketCreate{}) } -type commandBucketCreate struct { +type commandS3BucketCreate struct { } -func (c *commandBucketCreate) Name() string { - return "bucket.create" +func (c *commandS3BucketCreate) Name() string { + return "s3.bucket.create" } -func (c *commandBucketCreate) Help() string { +func (c *commandS3BucketCreate) Help() string { return `create a bucket with a given name Example: - bucket.create -name <bucket_name> -replication 001 + s3.bucket.create -name <bucket_name> -replication 001 ` } -func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketName := bucketCommand.String("name", "", "bucket name") - replication := bucketCommand.String("replication", "", "replication setting for the bucket") + replication := bucketCommand.String("replication", "", "replication setting for the bucket, if not set "+ + "it will honor the value defined by the filer if it exists, "+ + "else it will honor the value defined on the master") if err = bucketCommand.Parse(args); err != nil { return nil } diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index 02790b9e2..a8d8c5c29 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -9,24 +9,24 @@ import ( ) func init() { - Commands = append(Commands, &commandBucketDelete{}) + Commands = append(Commands, &commandS3BucketDelete{}) } -type commandBucketDelete struct { +type commandS3BucketDelete struct { } -func (c *commandBucketDelete) Name() string { - return "bucket.delete" +func (c *commandS3BucketDelete) Name() string { + return "s3.bucket.delete" } -func (c *commandBucketDelete) Help() string { +func (c *commandS3BucketDelete) Help() string { return `delete a bucket by a given name - bucket.delete -name <bucket_name> + s3.bucket.delete -name <bucket_name> ` } -func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) bucketName := bucketCommand.String("name", "", "bucket name") diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_s3_bucket_list.go index 2e446b6b2..4acf9a866 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -11,23 +11,23 @@ import ( ) func init() { - Commands = append(Commands, &commandBucketList{}) + Commands = append(Commands, &commandS3BucketList{}) } -type commandBucketList struct { +type commandS3BucketList struct { } -func (c *commandBucketList) Name() string { - return "bucket.list" +func (c *commandS3BucketList) Name() string { + return "s3.bucket.list" } -func (c *commandBucketList) Help() string { +func (c *commandS3BucketList) Help() string { return `list all buckets ` } -func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { +func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) if err = bucketCommand.Parse(args); err != nil { diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 7712c5eda..80a74c3e3 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -47,7 +47,7 @@ type Volume struct { volumeInfo *volume_server_pb.VolumeInfo location *DiskLocation - lastIoError error + lastIoError error } func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 34eee876d..a6efc630d 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -92,7 +92,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if err == nil && alsoLoadIndex { // adjust for existing volumes with .idx together with .dat files if v.dirIdx != v.dir { - if util.FileExists(v.DataFileName()+".idx") { + if util.FileExists(v.DataFileName() + ".idx") { v.dirIdx = v.dir } } @@ -100,12 +100,12 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if v.noWriteOrDelete { glog.V(0).Infoln("open to read file", v.FileName(".idx")) if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil { - return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err) + return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err) } } else { glog.V(1).Infoln("open to write file", v.FileName(".idx")) if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil { - return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err) + return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err) } } if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { @@ -115,7 +115,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind if v.noWriteOrDelete || v.noWriteCanDelete { if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil { - glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) + glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) } } else { switch needleMapKind { diff --git a/weed/util/config.go b/weed/util/config.go index 47f433028..94e621e34 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -19,11 +19,11 @@ type Configuration interface { func LoadConfiguration(configFileName string, required bool) (loaded bool) { // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in + viper.SetConfigName(configFileName) // name of config file (without extension) + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths + viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in + viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) diff --git a/weed/util/constants.go b/weed/util/constants.go index 89155e9a2..08d169545 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 16) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 18) COMMIT = "" ) |
