aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2016-09-07 21:07:11 -0700
committerGitHub <noreply@github.com>2016-09-07 21:07:11 -0700
commit99e47fc5fcd5743324feda9498a699717761924d (patch)
treeb57f95a290b27c8b83abaf953ab5c47b8ef1ce82 /weed/filer
parent0559aa96738d727e055190301bbb0624c74ced7f (diff)
parent78474409a596aabd8e579716ba1f4939e7d62579 (diff)
downloadseaweedfs-99e47fc5fcd5743324feda9498a699717761924d.tar.xz
seaweedfs-99e47fc5fcd5743324feda9498a699717761924d.zip
Merge pull request #361 from hxiaodon/master
add mysql filer support
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/cassandra_store/cassandra_store.go2
-rw-r--r--weed/filer/embedded_filer/files_in_leveldb.go4
-rw-r--r--weed/filer/filer.go6
-rw-r--r--weed/filer/mysql_store/README.md67
-rw-r--r--weed/filer/mysql_store/mysql_store.go270
-rw-r--r--weed/filer/mysql_store/mysql_store_test.go30
-rw-r--r--weed/filer/redis_store/redis_store.go4
7 files changed, 381 insertions, 2 deletions
diff --git a/weed/filer/cassandra_store/cassandra_store.go b/weed/filer/cassandra_store/cassandra_store.go
index cdb9d3e3c..50a792a65 100644
--- a/weed/filer/cassandra_store/cassandra_store.go
+++ b/weed/filer/cassandra_store/cassandra_store.go
@@ -3,6 +3,7 @@ package cassandra_store
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/gocql/gocql"
@@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
if err != gocql.ErrNotFound {
glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err)
+ return "", filer.ErrNotFound
}
}
if len(output) == 0 {
diff --git a/weed/filer/embedded_filer/files_in_leveldb.go b/weed/filer/embedded_filer/files_in_leveldb.go
index 19f6dd7e8..c40d7adaf 100644
--- a/weed/filer/embedded_filer/files_in_leveldb.go
+++ b/weed/filer/embedded_filer/files_in_leveldb.go
@@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string
}
func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) {
data, e := fl.db.Get(genKey(dirId, fileName), nil)
- if e != nil {
+ if e == leveldb.ErrNotFound {
+ return "", filer.ErrNotFound
+ } else if e != nil {
return "", e
}
return string(data), nil
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index fd23e119c..5d5acb68d 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -1,5 +1,9 @@
package filer
+import (
+ "errors"
+)
+
type FileId string //file id in SeaweedFS
type FileEntry struct {
@@ -26,3 +30,5 @@ type Filer interface {
DeleteDirectory(dirPath string, recursive bool) (err error)
Move(fromPath string, toPath string) (err error)
}
+
+var ErrNotFound = errors.New("filer: no entry is found in filer store")
diff --git a/weed/filer/mysql_store/README.md b/weed/filer/mysql_store/README.md
new file mode 100644
index 000000000..6efeb1c54
--- /dev/null
+++ b/weed/filer/mysql_store/README.md
@@ -0,0 +1,67 @@
+#MySQL filer mapping store
+
+## Schema format
+
+
+Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage,
+adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful.
+Of course, you could customize the schema per your concretely circumstance freely.
+
+<pre><code>
+CREATE TABLE IF NOT EXISTS `filer_mapping` (
+ `id` bigint(20) NOT NULL AUTO_INCREMENT,
+ `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
+ `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
+ `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
+ `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
+ `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
+ `status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_uriPath` (`uriPath`)
+) DEFAULT CHARSET=utf8;
+</code></pre>
+
+
+The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead,
+We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package
+while XML is a little bit complex.
+
+The sample config file's content is below:
+
+<pre><code>
+{
+ "mysql": [
+ {
+ "User": "root",
+ "Password": "root",
+ "HostName": "127.0.0.1",
+ "Port": 3306,
+ "DataBase": "seaweedfs"
+ },
+ {
+ "User": "root",
+ "Password": "root",
+ "HostName": "127.0.0.2",
+ "Port": 3306,
+ "DataBase": "seaweedfs"
+ }
+ ],
+ "IsSharding":true,
+ "ShardCount":1024
+}
+</code></pre>
+
+
+The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data.
+
+1. If one mysql instance is enough, just keep one instance in "mysql" field.
+
+2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field.
+
+3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value
+
+4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly
+
+
+
+
diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go
new file mode 100644
index 000000000..6910206ce
--- /dev/null
+++ b/weed/filer/mysql_store/mysql_store.go
@@ -0,0 +1,270 @@
+package mysql_store
+
+import (
+ "database/sql"
+ "fmt"
+ "hash/crc32"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
+ _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+ sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
+ default_maxIdleConnections = 100
+ default_maxOpenConnections = 50
+ default_maxTableNums = 1024
+ tableName = "filer_mapping"
+)
+
+var (
+ _init_db sync.Once
+ _db_connections []*sql.DB
+)
+
+type MySqlConf struct {
+ User string
+ Password string
+ HostName string
+ Port int
+ DataBase string
+ MaxIdleConnections int
+ MaxOpenConnections int
+}
+
+type ShardingConf struct {
+ IsSharding bool `json:"isSharding"`
+ ShardCount int `json:"shardCount"`
+}
+
+type MySqlStore struct {
+ dbs []*sql.DB
+ isSharding bool
+ shardCount int
+}
+
+func getDbConnection(confs []MySqlConf) []*sql.DB {
+ _init_db.Do(func() {
+ for _, conf := range confs {
+
+ sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
+ var dbErr error
+ _db_connection, dbErr := sql.Open("mysql", sqlUrl)
+ if dbErr != nil {
+ _db_connection.Close()
+ _db_connection = nil
+ panic(dbErr)
+ }
+ var maxIdleConnections, maxOpenConnections int
+
+ if conf.MaxIdleConnections != 0 {
+ maxIdleConnections = conf.MaxIdleConnections
+ } else {
+ maxIdleConnections = default_maxIdleConnections
+ }
+ if conf.MaxOpenConnections != 0 {
+ maxOpenConnections = conf.MaxOpenConnections
+ } else {
+ maxOpenConnections = default_maxOpenConnections
+ }
+
+ _db_connection.SetMaxIdleConns(maxIdleConnections)
+ _db_connection.SetMaxOpenConns(maxOpenConnections)
+ _db_connections = append(_db_connections, _db_connection)
+ }
+ })
+ return _db_connections
+}
+
+func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore {
+ ms := &MySqlStore{
+ dbs: getDbConnection(confs),
+ isSharding: isSharding,
+ shardCount: shardCount,
+ }
+
+ for _, db := range ms.dbs {
+ if !isSharding {
+ ms.shardCount = 1
+ } else {
+ if ms.shardCount == 0 {
+ ms.shardCount = default_maxTableNums
+ }
+ }
+ for i := 0; i < ms.shardCount; i++ {
+ if err := ms.createTables(db, tableName, i); err != nil {
+ fmt.Printf("create table failed %v", err)
+ }
+ }
+ }
+
+ return ms
+}
+
+func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
+ hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
+ instance_offset = int(hash_value) % len(s.dbs)
+ table_postfix = int(hash_value) % s.shardCount
+ return
+}
+
+func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
+ instance_offset, table_postfix := s.hash(path)
+ instanceId = instance_offset
+ if s.isSharding {
+ tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
+ } else {
+ tableFullName = tableName
+ }
+ return
+}
+
+func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
+ instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
+ if err != nil {
+ return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err)
+ }
+ fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
+ if err == sql.ErrNoRows {
+ //Could not found
+ err = filer.ErrNotFound
+ }
+ return fid, err
+}
+
+func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
+ var tableFullName string
+
+ instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
+ if err != nil {
+ return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err)
+ }
+ var old_fid string
+ if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows {
+ return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err)
+ } else {
+ if len(old_fid) == 0 {
+ err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
+ err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err)
+ } else {
+ err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
+ err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err)
+ }
+ }
+ return
+}
+
+func (s *MySqlStore) Delete(fullFilePath string) (err error) {
+ var fid string
+ instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
+ if err != nil {
+ return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err)
+ }
+ if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
+ return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err)
+ } else if fid == "" {
+ return nil
+ }
+ if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
+ return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err)
+ } else {
+ return nil
+ }
+}
+
+func (s *MySqlStore) Close() {
+ for _, db := range s.dbs {
+ db.Close()
+ }
+}
+
+var createTable = `
+CREATE TABLE IF NOT EXISTS %s (
+ id bigint(20) NOT NULL AUTO_INCREMENT,
+ uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
+ fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
+ createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
+ updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
+ remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
+ status tinyint(2) DEFAULT '1' COMMENT 'resource status',
+ PRIMARY KEY (id),
+ UNIQUE KEY index_uriPath (uriPath)
+) DEFAULT CHARSET=utf8;
+`
+
+func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
+ var realTableName string
+ if s.isSharding {
+ realTableName = fmt.Sprintf("%s_%4d", tableName, postfix)
+ } else {
+ realTableName = tableName
+ }
+
+ stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName))
+ if err != nil {
+ return err
+ }
+ defer stmt.Close()
+
+ _, err = stmt.Exec()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
+ sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
+ row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
+ var fid string
+ err := row.Scan(&fid)
+ if err != nil {
+ return "", err
+ }
+ return fid, nil
+}
+
+func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
+ sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
+ res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
+ if err != nil {
+ return err
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
+ sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
+ res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
+ if err != nil {
+ return err
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
+ sqlStatement := "DELETE FROM %s WHERE uriPath=?"
+ res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
+ if err != nil {
+ return err
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go
new file mode 100644
index 000000000..1c9765c59
--- /dev/null
+++ b/weed/filer/mysql_store/mysql_store_test.go
@@ -0,0 +1,30 @@
+package mysql_store
+
+import (
+ "encoding/json"
+ "hash/crc32"
+ "testing"
+)
+
+func TestGenerateMysqlConf(t *testing.T) {
+ var conf []MySqlConf
+ conf = append(conf, MySqlConf{
+ User: "root",
+ Password: "root",
+ HostName: "localhost",
+ Port: 3306,
+ DataBase: "seaweedfs",
+ })
+ body, err := json.Marshal(conf)
+ if err != nil {
+ t.Errorf("json encoding err %s", err.Error())
+ }
+ t.Logf("json output is %s", string(body))
+}
+
+func TestCRC32FullPathName(t *testing.T) {
+ fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf"
+ hash_value := crc32.ChecksumIEEE([]byte(fullPathName))
+ table_postfix := int(hash_value) % 1024
+ t.Logf("table postfix %d", table_postfix)
+}
diff --git a/weed/filer/redis_store/redis_store.go b/weed/filer/redis_store/redis_store.go
index 5e51b5455..2ad49a805 100644
--- a/weed/filer/redis_store/redis_store.go
+++ b/weed/filer/redis_store/redis_store.go
@@ -1,6 +1,8 @@
package redis_store
import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
redis "gopkg.in/redis.v2"
)
@@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore {
func (s *RedisStore) Get(fullFileName string) (fid string, err error) {
fid, err = s.Client.Get(fullFileName).Result()
if err == redis.Nil {
- err = nil
+ err = filer.ErrNotFound
}
return fid, err
}