diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/cassandra_store/cassandra_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/embedded_filer/files_in_leveldb.go | 4 | ||||
| -rw-r--r-- | weed/filer/filer.go | 6 | ||||
| -rw-r--r-- | weed/filer/mysql_store/README.md | 67 | ||||
| -rw-r--r-- | weed/filer/mysql_store/mysql_store.go | 270 | ||||
| -rw-r--r-- | weed/filer/mysql_store/mysql_store_test.go | 30 | ||||
| -rw-r--r-- | weed/filer/redis_store/redis_store.go | 4 |
7 files changed, 381 insertions, 2 deletions
diff --git a/weed/filer/cassandra_store/cassandra_store.go b/weed/filer/cassandra_store/cassandra_store.go index cdb9d3e3c..50a792a65 100644 --- a/weed/filer/cassandra_store/cassandra_store.go +++ b/weed/filer/cassandra_store/cassandra_store.go @@ -3,6 +3,7 @@ package cassandra_store import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/gocql/gocql" @@ -59,6 +60,7 @@ func (c *CassandraStore) Get(fullFileName string) (fid string, err error) { fullFileName).Consistency(gocql.One).Scan(&output); err != nil { if err != gocql.ErrNotFound { glog.V(0).Infof("Failed to find file %s: %v", fullFileName, fid, err) + return "", filer.ErrNotFound } } if len(output) == 0 { diff --git a/weed/filer/embedded_filer/files_in_leveldb.go b/weed/filer/embedded_filer/files_in_leveldb.go index 19f6dd7e8..c40d7adaf 100644 --- a/weed/filer/embedded_filer/files_in_leveldb.go +++ b/weed/filer/embedded_filer/files_in_leveldb.go @@ -53,7 +53,9 @@ func (fl *FileListInLevelDb) DeleteFile(dirId filer.DirectoryId, fileName string } func (fl *FileListInLevelDb) FindFile(dirId filer.DirectoryId, fileName string) (fid string, err error) { data, e := fl.db.Get(genKey(dirId, fileName), nil) - if e != nil { + if e == leveldb.ErrNotFound { + return "", filer.ErrNotFound + } else if e != nil { return "", e } return string(data), nil diff --git a/weed/filer/filer.go b/weed/filer/filer.go index fd23e119c..5d5acb68d 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -1,5 +1,9 @@ package filer +import ( + "errors" +) + type FileId string //file id in SeaweedFS type FileEntry struct { @@ -26,3 +30,5 @@ type Filer interface { DeleteDirectory(dirPath string, recursive bool) (err error) Move(fromPath string, toPath string) (err error) } + +var ErrNotFound = errors.New("filer: no entry is found in filer store") diff --git a/weed/filer/mysql_store/README.md b/weed/filer/mysql_store/README.md new file mode 100644 index 000000000..6efeb1c54 --- /dev/null +++ b/weed/filer/mysql_store/README.md @@ -0,0 +1,67 @@ +#MySQL filer mapping store + +## Schema format + + +Basically, uriPath and fid are the key elements stored in MySQL. In view of the optimization and user's usage, +adding primary key with integer type and involving createTime, updateTime, status fields should be somewhat meaningful. +Of course, you could customize the schema per your concretely circumstance freely. + +<pre><code> +CREATE TABLE IF NOT EXISTS `filer_mapping` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT, + `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', + `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', + `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', + `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', + `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', + `status` tinyint(2) DEFAULT '1' COMMENT 'resource status', + PRIMARY KEY (`id`), + UNIQUE KEY `index_uriPath` (`uriPath`) +) DEFAULT CHARSET=utf8; +</code></pre> + + +The MySQL 's config params is not added into the weed command option as other stores(redis,cassandra). Instead, +We created a config file(json format) for them. TOML,YAML or XML also should be OK. But TOML and YAML need import thirdparty package +while XML is a little bit complex. + +The sample config file's content is below: + +<pre><code> +{ + "mysql": [ + { + "User": "root", + "Password": "root", + "HostName": "127.0.0.1", + "Port": 3306, + "DataBase": "seaweedfs" + }, + { + "User": "root", + "Password": "root", + "HostName": "127.0.0.2", + "Port": 3306, + "DataBase": "seaweedfs" + } + ], + "IsSharding":true, + "ShardCount":1024 +} +</code></pre> + + +The "mysql" field in above conf file is an array which include all mysql instances you prepared to store sharding data. + +1. If one mysql instance is enough, just keep one instance in "mysql" field. + +2. If table sharding at a specific mysql instance is needed , mark "IsSharding" field with true and specify total table sharding numbers using "ShardCount" field. + +3. If the mysql service could be auto scaled transparently in your environment, just config one mysql instance(usually it's a frondend proxy or VIP),and mark "IsSharding" with false value + +4. If you prepare more than one mysql instance and have no plan to use table sharding for any instance(mark isSharding with false), instance sharding will still be done implicitly + + + + diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go new file mode 100644 index 000000000..6910206ce --- /dev/null +++ b/weed/filer/mysql_store/mysql_store.go @@ -0,0 +1,270 @@ +package mysql_store + +import ( + "database/sql" + "fmt" + "hash/crc32" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + + _ "github.com/go-sql-driver/mysql" +) + +const ( + sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8" + default_maxIdleConnections = 100 + default_maxOpenConnections = 50 + default_maxTableNums = 1024 + tableName = "filer_mapping" +) + +var ( + _init_db sync.Once + _db_connections []*sql.DB +) + +type MySqlConf struct { + User string + Password string + HostName string + Port int + DataBase string + MaxIdleConnections int + MaxOpenConnections int +} + +type ShardingConf struct { + IsSharding bool `json:"isSharding"` + ShardCount int `json:"shardCount"` +} + +type MySqlStore struct { + dbs []*sql.DB + isSharding bool + shardCount int +} + +func getDbConnection(confs []MySqlConf) []*sql.DB { + _init_db.Do(func() { + for _, conf := range confs { + + sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase) + var dbErr error + _db_connection, dbErr := sql.Open("mysql", sqlUrl) + if dbErr != nil { + _db_connection.Close() + _db_connection = nil + panic(dbErr) + } + var maxIdleConnections, maxOpenConnections int + + if conf.MaxIdleConnections != 0 { + maxIdleConnections = conf.MaxIdleConnections + } else { + maxIdleConnections = default_maxIdleConnections + } + if conf.MaxOpenConnections != 0 { + maxOpenConnections = conf.MaxOpenConnections + } else { + maxOpenConnections = default_maxOpenConnections + } + + _db_connection.SetMaxIdleConns(maxIdleConnections) + _db_connection.SetMaxOpenConns(maxOpenConnections) + _db_connections = append(_db_connections, _db_connection) + } + }) + return _db_connections +} + +func NewMysqlStore(confs []MySqlConf, isSharding bool, shardCount int) *MySqlStore { + ms := &MySqlStore{ + dbs: getDbConnection(confs), + isSharding: isSharding, + shardCount: shardCount, + } + + for _, db := range ms.dbs { + if !isSharding { + ms.shardCount = 1 + } else { + if ms.shardCount == 0 { + ms.shardCount = default_maxTableNums + } + } + for i := 0; i < ms.shardCount; i++ { + if err := ms.createTables(db, tableName, i); err != nil { + fmt.Printf("create table failed %v", err) + } + } + } + + return ms +} + +func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) { + hash_value := crc32.ChecksumIEEE([]byte(fullFileName)) + instance_offset = int(hash_value) % len(s.dbs) + table_postfix = int(hash_value) % s.shardCount + return +} + +func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) { + instance_offset, table_postfix := s.hash(path) + instanceId = instance_offset + if s.isSharding { + tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix) + } else { + tableFullName = tableName + } + return +} + +func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) { + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return "", fmt.Errorf("MySqlStore Get operation can not parse file path %s: err is %v", fullFilePath, err) + } + fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName) + if err == sql.ErrNoRows { + //Could not found + err = filer.ErrNotFound + } + return fid, err +} + +func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) { + var tableFullName string + + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return fmt.Errorf("MySqlStore Put operation can not parse file path %s: err is %v", fullFilePath, err) + } + var old_fid string + if old_fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil && err != sql.ErrNoRows { + return fmt.Errorf("MySqlStore Put operation failed when querying path %s: err is %v", fullFilePath, err) + } else { + if len(old_fid) == 0 { + err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + err = fmt.Errorf("MySqlStore Put operation failed when inserting path %s with fid %s : err is %v", fullFilePath, fid, err) + } else { + err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName) + err = fmt.Errorf("MySqlStore Put operation failed when updating path %s with fid %s : err is %v", fullFilePath, fid, err) + } + } + return +} + +func (s *MySqlStore) Delete(fullFilePath string) (err error) { + var fid string + instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath) + if err != nil { + return fmt.Errorf("MySqlStore Delete operation can not parse file path %s: err is %v", fullFilePath, err) + } + if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return fmt.Errorf("MySqlStore Delete operation failed when querying path %s: err is %v", fullFilePath, err) + } else if fid == "" { + return nil + } + if err = s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil { + return fmt.Errorf("MySqlStore Delete operation failed when deleting path %s: err is %v", fullFilePath, err) + } else { + return nil + } +} + +func (s *MySqlStore) Close() { + for _, db := range s.dbs { + db.Close() + } +} + +var createTable = ` +CREATE TABLE IF NOT EXISTS %s ( + id bigint(20) NOT NULL AUTO_INCREMENT, + uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath', + fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid', + createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp', + updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp', + remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field', + status tinyint(2) DEFAULT '1' COMMENT 'resource status', + PRIMARY KEY (id), + UNIQUE KEY index_uriPath (uriPath) +) DEFAULT CHARSET=utf8; +` + +func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error { + var realTableName string + if s.isSharding { + realTableName = fmt.Sprintf("%s_%4d", tableName, postfix) + } else { + realTableName = tableName + } + + stmt, err := db.Prepare(fmt.Sprintf(createTable, realTableName)) + if err != nil { + return err + } + defer stmt.Close() + + _, err = stmt.Exec() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) { + sqlStatement := "SELECT fid FROM %s WHERE uriPath=?" + row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath) + var fid string + err := row.Scan(&fid) + if err != nil { + return "", err + } + return fid, nil +} + +func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error { + sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error { + sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix()) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} + +func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error { + sqlStatement := "DELETE FROM %s WHERE uriPath=?" + res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath) + if err != nil { + return err + } + + _, err = res.RowsAffected() + if err != nil { + return err + } + return nil +} diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go new file mode 100644 index 000000000..1c9765c59 --- /dev/null +++ b/weed/filer/mysql_store/mysql_store_test.go @@ -0,0 +1,30 @@ +package mysql_store + +import ( + "encoding/json" + "hash/crc32" + "testing" +) + +func TestGenerateMysqlConf(t *testing.T) { + var conf []MySqlConf + conf = append(conf, MySqlConf{ + User: "root", + Password: "root", + HostName: "localhost", + Port: 3306, + DataBase: "seaweedfs", + }) + body, err := json.Marshal(conf) + if err != nil { + t.Errorf("json encoding err %s", err.Error()) + } + t.Logf("json output is %s", string(body)) +} + +func TestCRC32FullPathName(t *testing.T) { + fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf" + hash_value := crc32.ChecksumIEEE([]byte(fullPathName)) + table_postfix := int(hash_value) % 1024 + t.Logf("table postfix %d", table_postfix) +} diff --git a/weed/filer/redis_store/redis_store.go b/weed/filer/redis_store/redis_store.go index 5e51b5455..2ad49a805 100644 --- a/weed/filer/redis_store/redis_store.go +++ b/weed/filer/redis_store/redis_store.go @@ -1,6 +1,8 @@ package redis_store import ( + "github.com/chrislusf/seaweedfs/weed/filer" + redis "gopkg.in/redis.v2" ) @@ -20,7 +22,7 @@ func NewRedisStore(hostPort string, password string, database int) *RedisStore { func (s *RedisStore) Get(fullFileName string) (fid string, err error) { fid, err = s.Client.Get(fullFileName).Result() if err == redis.Nil { - err = nil + err = filer.ErrNotFound } return fid, err } |
