aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go2
-rw-r--r--weed/filer/entry.go1
-rw-r--r--weed/filer/entry_codec.go2
-rw-r--r--weed/filer/filechunk_manifest.go2
-rw-r--r--weed/filer/filer_conf.go2
-rw-r--r--weed/filer/filer_notify.go5
-rw-r--r--weed/filer/filer_notify_append.go2
-rw-r--r--weed/filer/mysql/mysql_sql_gen.go14
-rw-r--r--weed/filer/mysql/mysql_store.go6
-rw-r--r--weed/filer/mysql2/mysql2_store.go6
-rw-r--r--weed/filer/postgres/postgres_sql_gen.go14
-rw-r--r--weed/filer/postgres/postgres_store.go13
-rw-r--r--weed/filer/postgres2/postgres2_store.go13
-rw-r--r--weed/filer/read_write.go2
-rw-r--r--weed/filer/stream.go2
15 files changed, 55 insertions, 31 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 91b0bc98f..07ce56145 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -107,7 +107,7 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full
}
if _, found := store.dbs[bucket]; !found {
- if err = store.CreateTable(ctx, bucket); err != nil {
+ if err = store.CreateTable(ctx, bucket); err == nil {
store.dbs[bucket] = true
}
}
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index dbe10c9b1..b7c8370e6 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -18,6 +18,7 @@ type Attr struct {
Replication string // replication
Collection string // collection name
TtlSec int32 // ttl in seconds
+ DiskType string
UserName string
GroupNames []string
SymlinkTarget string
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
index 1693b551e..4c613f068 100644
--- a/weed/filer/entry_codec.go
+++ b/weed/filer/entry_codec.go
@@ -56,6 +56,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
Collection: entry.Attr.Collection,
Replication: entry.Attr.Replication,
TtlSec: entry.Attr.TtlSec,
+ DiskType: entry.Attr.DiskType,
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
@@ -81,6 +82,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.Collection = attr.Collection
t.Replication = attr.Replication
t.TtlSec = attr.TtlSec
+ t.DiskType = attr.DiskType
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 845bfaec1..99a62c90c 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 18ed37abd..b4f0e5890 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -116,7 +116,7 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
a.Collection = util.Nvl(b.Collection, a.Collection)
a.Replication = util.Nvl(b.Replication, a.Replication)
a.Ttl = util.Nvl(b.Ttl, a.Ttl)
- if b.DiskType != filer_pb.FilerConf_PathConf_NONE {
+ if b.DiskType != "" {
a.DiskType = b.DiskType
}
a.Fsync = b.Fsync || a.Fsync
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index f3a795ad0..c461a82b8 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
if notification.Queue != nil {
glog.V(3).Infof("notifying entry update %v", fullpath)
- notification.Queue.SendMessage(fullpath, eventNotification)
+ if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil {
+ // throw message
+ glog.Error(err)
+ }
}
f.logMetaEvent(ctx, fullpath, eventNotification)
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index 09c39dd89..d441bbbc9 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -56,7 +56,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
WritableVolumeCount: rule.VolumeGrowthCount,
}
- assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
+ assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
if err != nil {
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
}
diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go
index 057484c37..4213cf965 100644
--- a/weed/filer/mysql/mysql_sql_gen.go
+++ b/weed/filer/mysql/mysql_sql_gen.go
@@ -16,31 +16,31 @@ var (
)
func (gen *SqlGenMysql) GetSqlInsert(bucket string) string {
- return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
+ return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
}
func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string {
- return fmt.Sprintf("UPDATE %s SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
+ return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlFind(bucket string) string {
- return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
+ return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlDelete(bucket string) string {
- return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
+ return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string {
- return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND directory=?", bucket)
+ return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", bucket)
}
func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string {
- return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+ return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
}
func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string {
- return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+ return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
}
func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string {
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
index 686628740..501ab1d39 100644
--- a/weed/filer/mysql/mysql_store.go
+++ b/weed/filer/mysql/mysql_store.go
@@ -47,12 +47,14 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.SupportBucketTable = false
store.SqlGenerator = &SqlGenMysql{
CreateTableSqlTemplate: "",
- DropTableSqlTemplate: "drop table %s",
+ DropTableSqlTemplate: "drop table `%s`",
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database)
if interpolateParams {
sqlUrl += "&interpolateParams=true"
+ adaptedSqlUrl += "&interpolateParams=true"
}
var dbErr error
@@ -60,7 +62,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
if dbErr != nil {
store.DB.Close()
store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
index 15216b651..c796cd6aa 100644
--- a/weed/filer/mysql2/mysql2_store.go
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -50,12 +50,14 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin
store.SupportBucketTable = true
store.SqlGenerator = &mysql.SqlGenMysql{
CreateTableSqlTemplate: createTable,
- DropTableSqlTemplate: "drop table %s",
+ DropTableSqlTemplate: "drop table `%s`",
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database)
if interpolateParams {
sqlUrl += "&interpolateParams=true"
+ adaptedSqlUrl += "&interpolateParams=true"
}
var dbErr error
@@ -63,7 +65,7 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin
if dbErr != nil {
store.DB.Close()
store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)
diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go
index 284cf254b..e13070c3d 100644
--- a/weed/filer/postgres/postgres_sql_gen.go
+++ b/weed/filer/postgres/postgres_sql_gen.go
@@ -17,31 +17,31 @@ var (
)
func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string {
- return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)", bucket)
+ return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, bucket)
}
func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string {
- return fmt.Sprintf("UPDATE %s SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4", bucket)
+ return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, bucket)
}
func (gen *SqlGenPostgres) GetSqlFind(bucket string) string {
- return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
+ return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket)
}
func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string {
- return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
+ return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket)
}
func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string {
- return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND directory=$2", bucket)
+ return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, bucket)
}
func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string {
- return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
+ return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket)
}
func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string {
- return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
+ return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket)
}
func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string {
diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go
index 27c6278c7..9e4ff7c32 100644
--- a/weed/filer/postgres/postgres_store.go
+++ b/weed/filer/postgres/postgres_store.go
@@ -3,6 +3,7 @@ package postgres
import (
"database/sql"
"fmt"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
@@ -37,40 +38,46 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"sslmode"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
)
}
-func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
+func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = false
store.SqlGenerator = &SqlGenPostgres{
CreateTableSqlTemplate: "",
- DropTableSqlTemplate: "drop table %s",
+ DropTableSqlTemplate: `drop table "%s"`,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" {
sqlUrl += " user=" + user
}
+ adaptedSqlUrl := sqlUrl
if password != "" {
sqlUrl += " password=" + password
+ adaptedSqlUrl += " password=ADAPTED"
}
if database != "" {
sqlUrl += " dbname=" + database
+ adaptedSqlUrl += " dbname=" + database
}
if schema != "" {
sqlUrl += " search_path=" + schema
+ adaptedSqlUrl += " search_path=" + schema
}
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
store.DB.Close()
store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
+ store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go
index 82552376f..92893bf7a 100644
--- a/weed/filer/postgres2/postgres2_store.go
+++ b/weed/filer/postgres2/postgres2_store.go
@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
@@ -40,40 +41,46 @@ func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"sslmode"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
)
}
-func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
+func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = true
store.SqlGenerator = &postgres.SqlGenPostgres{
CreateTableSqlTemplate: createTable,
- DropTableSqlTemplate: "drop table %s",
+ DropTableSqlTemplate: `drop table "%s"`,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" {
sqlUrl += " user=" + user
}
+ adaptedSqlUrl := sqlUrl
if password != "" {
sqlUrl += " password=" + password
+ adaptedSqlUrl += " password=ADAPTED"
}
if database != "" {
sqlUrl += " dbname=" + database
+ adaptedSqlUrl += " dbname=" + database
}
if schema != "" {
sqlUrl += " search_path=" + schema
+ adaptedSqlUrl += " search_path=" + schema
}
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
store.DB.Close()
store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
+ store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index 1f78057ef..7a6da3beb 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
- data, _, err := util.Get(target)
+ data, _, err := util.FastGet(target)
return data, err
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index f0042a0ff..075204b79 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {