aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer.go5
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/filer/mysql_store/filer_mapping.sql11
-rw-r--r--weed/filer/mysql_store/mysql_store.go224
-rw-r--r--weed/filer/mysql_store/mysql_store_test.go60
-rw-r--r--weed/server/filer_server.go42
6 files changed, 340 insertions, 4 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 0bd508e0b..7d90707a6 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -24,7 +24,8 @@ type FilerOptions struct {
dir *string
redirectOnRead *bool
disableDirListing *bool
- maxMB *int
+ confFile *string
+ maxMB *int
secretKey *string
cassandra_server *string
cassandra_keyspace *string
@@ -43,6 +44,7 @@ func init() {
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
+ f.confFile = cmdFiler.Flag.String("confFile", "", "json encoded filer conf file")
f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit")
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
@@ -84,6 +86,7 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing,
+ *f.confFile,
*f.maxMB,
*f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace,
diff --git a/weed/command/server.go b/weed/command/server.go
index 7a6677a65..eed7dcae4 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -86,6 +86,7 @@ func init() {
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
+ filerOptions.confFile = cmdServer.Flag.String("filer.confFile", "", "json encoded filer conf file")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit")
filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
@@ -170,6 +171,7 @@ func runServer(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement,
*filerOptions.redirectOnRead, *filerOptions.disableDirListing,
+ *filerOptions.confFile,
*filerOptions.maxMB,
*filerOptions.secretKey,
*filerOptions.cassandra_server, *filerOptions.cassandra_keyspace,
diff --git a/weed/filer/mysql_store/filer_mapping.sql b/weed/filer/mysql_store/filer_mapping.sql
new file mode 100644
index 000000000..6bbe4e880
--- /dev/null
+++ b/weed/filer/mysql_store/filer_mapping.sql
@@ -0,0 +1,11 @@
+CREATE TABLE IF NOT EXISTS `filer_mapping` (
+ `id` bigint(20) NOT NULL AUTO_INCREMENT,
+ `uriPath` char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
+ `fid` char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
+ `createTime` int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
+ `updateTime` int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
+ `remark` varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
+ `status` tinyint(2) DEFAULT '1' COMMENT 'resource status',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_uriPath` (`uriPath`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8; \ No newline at end of file
diff --git a/weed/filer/mysql_store/mysql_store.go b/weed/filer/mysql_store/mysql_store.go
new file mode 100644
index 000000000..44d0d88a7
--- /dev/null
+++ b/weed/filer/mysql_store/mysql_store.go
@@ -0,0 +1,224 @@
+package mysql_store
+
+import (
+ "database/sql"
+ "fmt"
+ "hash/crc32"
+ "sync"
+ "time"
+
+ _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+ sqlUrl = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
+ maxIdleConnections = 100
+ maxOpenConnections = 50
+ maxTableNums = 1024
+ tableName = "filer_mapping"
+)
+
+var (
+ _init_db sync.Once
+ _db_connections []*sql.DB
+)
+
+type MySqlConf struct {
+ User string
+ Password string
+ HostName string
+ Port int
+ DataBase string
+}
+
+type MySqlStore struct {
+ dbs []*sql.DB
+}
+
+func getDbConnection(confs []MySqlConf) []*sql.DB {
+ _init_db.Do(func() {
+ for _, conf := range confs {
+
+ sqlUrl := fmt.Sprintf(sqlUrl, conf.User, conf.Password, conf.HostName, conf.Port, conf.DataBase)
+ var dbErr error
+ _db_connection, dbErr := sql.Open("mysql", sqlUrl)
+ if dbErr != nil {
+ _db_connection.Close()
+ _db_connection = nil
+ panic(dbErr)
+ }
+ _db_connection.SetMaxIdleConns(maxIdleConnections)
+ _db_connection.SetMaxOpenConns(maxOpenConnections)
+ _db_connections = append(_db_connections, _db_connection)
+ }
+ })
+ return _db_connections
+}
+
+func NewMysqlStore(confs []MySqlConf) *MySqlStore {
+ ms := &MySqlStore{
+ dbs: getDbConnection(confs),
+ }
+
+ for _, db := range ms.dbs {
+ for i := 0; i < maxTableNums; i++ {
+ if err := ms.createTables(db, tableName, i); err != nil {
+ fmt.Printf("create table failed %s", err.Error())
+ }
+ }
+ }
+
+ return ms
+}
+
+func (s *MySqlStore) hash(fullFileName string) (instance_offset, table_postfix int) {
+ hash_value := crc32.ChecksumIEEE([]byte(fullFileName))
+ instance_offset = int(hash_value) % len(s.dbs)
+ table_postfix = int(hash_value) % maxTableNums
+ return
+}
+
+func (s *MySqlStore) parseFilerMappingInfo(path string) (instanceId int, tableFullName string, err error) {
+ instance_offset, table_postfix := s.hash(path)
+ instanceId = instance_offset
+ tableFullName = fmt.Sprintf("%s_%04d", tableName, table_postfix)
+ return
+}
+
+func (s *MySqlStore) Get(fullFilePath string) (fid string, err error) {
+ instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
+ if err != nil {
+ return "", err
+ }
+ fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName)
+ if err == sql.ErrNoRows {
+ //Could not found
+ err = nil
+ }
+ return fid, err
+}
+
+func (s *MySqlStore) Put(fullFilePath string, fid string) (err error) {
+ var tableFullName string
+
+ instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
+ if err != nil {
+ return err
+ }
+ if old_fid, localErr := s.query(fullFilePath, s.dbs[instance_offset], tableFullName); localErr != nil && localErr != sql.ErrNoRows {
+ err = localErr
+ return
+ } else {
+ if len(old_fid) == 0 {
+ err = s.insert(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
+ } else {
+ err = s.update(fullFilePath, fid, s.dbs[instance_offset], tableFullName)
+ }
+ }
+ return
+}
+
+func (s *MySqlStore) Delete(fullFilePath string) (err error) {
+ var fid string
+ instance_offset, tableFullName, err := s.parseFilerMappingInfo(fullFilePath)
+ if err != nil {
+ return err
+ }
+ if fid, err = s.query(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
+ return err
+ } else if fid == "" {
+ return nil
+ }
+ if err := s.delete(fullFilePath, s.dbs[instance_offset], tableFullName); err != nil {
+ return err
+ } else {
+ return nil
+ }
+}
+
+func (s *MySqlStore) Close() {
+ for _, db := range s.dbs {
+ db.Close()
+ }
+}
+
+var createTable = `
+CREATE TABLE IF NOT EXISTS %s_%04d (
+ id bigint(20) NOT NULL AUTO_INCREMENT,
+ uriPath char(256) NOT NULL DEFAULT "" COMMENT 'http uriPath',
+ fid char(36) NOT NULL DEFAULT "" COMMENT 'seaweedfs fid',
+ createTime int(10) NOT NULL DEFAULT 0 COMMENT 'createdTime in unix timestamp',
+ updateTime int(10) NOT NULL DEFAULT 0 COMMENT 'updatedTime in unix timestamp',
+ remark varchar(20) NOT NULL DEFAULT "" COMMENT 'reserverd field',
+ status tinyint(2) DEFAULT '1' COMMENT 'resource status',
+ PRIMARY KEY (id),
+ UNIQUE KEY index_uriPath (uriPath)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+`
+
+func (s *MySqlStore) createTables(db *sql.DB, tableName string, postfix int) error {
+ stmt, err := db.Prepare(fmt.Sprintf(createTable, tableName, postfix))
+ if err != nil {
+ return err
+ }
+ defer stmt.Close()
+
+ _, err = stmt.Exec()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *MySqlStore) query(uriPath string, db *sql.DB, tableName string) (string, error) {
+ sqlStatement := "SELECT fid FROM %s WHERE uriPath=?"
+ row := db.QueryRow(fmt.Sprintf(sqlStatement, tableName), uriPath)
+ var fid string
+ err := row.Scan(&fid)
+ if err != nil {
+ return "", err
+ }
+ return fid, nil
+}
+
+func (s *MySqlStore) update(uriPath string, fid string, db *sql.DB, tableName string) error {
+ sqlStatement := "UPDATE %s SET fid=?, updateTime=? WHERE uriPath=?"
+ res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), fid, time.Now().Unix(), uriPath)
+ if err != nil {
+ return err
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *MySqlStore) insert(uriPath string, fid string, db *sql.DB, tableName string) error {
+ sqlStatement := "INSERT INTO %s (uriPath,fid,createTime) VALUES(?,?,?)"
+ res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath, fid, time.Now().Unix())
+ if err != nil {
+ return err
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *MySqlStore) delete(uriPath string, db *sql.DB, tableName string) error {
+ sqlStatement := "DELETE FROM %s WHERE uriPath=?"
+ res, err := db.Exec(fmt.Sprintf(sqlStatement, tableName), uriPath)
+ if err != nil {
+ return err
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/filer/mysql_store/mysql_store_test.go b/weed/filer/mysql_store/mysql_store_test.go
new file mode 100644
index 000000000..e89ca9020
--- /dev/null
+++ b/weed/filer/mysql_store/mysql_store_test.go
@@ -0,0 +1,60 @@
+package mysql_store
+
+import (
+ "encoding/json"
+ "hash/crc32"
+ "testing"
+)
+
+/*
+To improve performance when storing billion of files, you could shar
+At each mysql instance, we will try to create 1024 tables if not exist, table name will be something like:
+filer_mapping_0000
+filer_mapping_0001
+.....
+filer_mapping_1023
+sample conf should be
+
+>$cat filer_conf.json
+{
+ "mysql": [
+ {
+ "User": "root",
+ "Password": "root",
+ "HostName": "127.0.0.1",
+ "Port": 3306,
+ "DataBase": "seaweedfs"
+ },
+ {
+ "User": "root",
+ "Password": "root",
+ "HostName": "127.0.0.2",
+ "Port": 3306,
+ "DataBase": "seaweedfs"
+ }
+ ]
+}
+*/
+
+func TestGenerateMysqlConf(t *testing.T) {
+ var conf MySqlConf
+ conf = append(conf, MySqlInstConf{
+ User: "root",
+ Password: "root",
+ HostName: "localhost",
+ Port: 3306,
+ DataBase: "seaweedfs",
+ })
+ body, err := json.Marshal(conf)
+ if err != nil {
+ t.Errorf("json encoding err %s", err.Error())
+ }
+ t.Logf("json output is %s", string(body))
+}
+
+func TestCRC32FullPathName(t *testing.T) {
+ fullPathName := "/prod-bucket/law632191483895612493300-signed.pdf"
+ hash_value := crc32.ChecksumIEEE([]byte(fullPathName))
+ table_postfix := int(hash_value) % 1024
+ t.Logf("table postfix %d", table_postfix)
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 3c7c1fd9e..1da0d065d 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,8 +1,10 @@
package weed_server
import (
+ "encoding/json"
"math/rand"
"net/http"
+ "os"
"strconv"
"sync"
"time"
@@ -11,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
"github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
"github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
+ "github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
"github.com/chrislusf/seaweedfs/weed/filer/redis_store"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -18,6 +21,25 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+type filerConf struct {
+ MysqlConf []mysql_store.MySqlConf `json:"mysql"`
+}
+
+func parseConfFile(confPath string) (*filerConf, error) {
+ var setting filerConf
+ configFile, err := os.Open(confPath)
+ defer configFile.Close()
+ if err != nil {
+ return nil, err
+ }
+
+ jsonParser := json.NewDecoder(configFile)
+ if err = jsonParser.Decode(&setting); err != nil {
+ return nil, err
+ }
+ return &setting, nil
+}
+
type FilerServer struct {
port string
master string
@@ -28,12 +50,13 @@ type FilerServer struct {
disableDirListing bool
secret security.Secret
filer filer.Filer
- maxMB int
+ maxMB int
masterNodes *storage.MasterNodes
}
func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string,
replication string, redirectOnRead bool, disableDirListing bool,
+ confFile string,
maxMB int,
secret string,
cassandra_server string, cassandra_keyspace string,
@@ -45,11 +68,24 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
defaultReplication: replication,
redirectOnRead: redirectOnRead,
disableDirListing: disableDirListing,
- maxMB: maxMB,
+ maxMB: maxMB,
port: ip + ":" + strconv.Itoa(port),
}
- if cassandra_server != "" {
+ var setting *filerConf
+ if confFile != "" {
+ setting, err = parseConfFile(confFile)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ setting = new(filerConf)
+ }
+
+ if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
+ mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf)
+ fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
+ } else if cassandra_server != "" {
cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
if err != nil {
glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)