diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/abstract_sql/abstract_sql_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/entry.go | 1 | ||||
| -rw-r--r-- | weed/filer/entry_codec.go | 2 | ||||
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer_conf.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer_notify.go | 5 | ||||
| -rw-r--r-- | weed/filer/filer_notify_append.go | 2 | ||||
| -rw-r--r-- | weed/filer/mysql/mysql_sql_gen.go | 14 | ||||
| -rw-r--r-- | weed/filer/mysql/mysql_store.go | 6 | ||||
| -rw-r--r-- | weed/filer/mysql2/mysql2_store.go | 6 | ||||
| -rw-r--r-- | weed/filer/postgres/postgres_sql_gen.go | 14 | ||||
| -rw-r--r-- | weed/filer/postgres/postgres_store.go | 13 | ||||
| -rw-r--r-- | weed/filer/postgres2/postgres2_store.go | 13 | ||||
| -rw-r--r-- | weed/filer/read_write.go | 2 | ||||
| -rw-r--r-- | weed/filer/stream.go | 2 |
15 files changed, 55 insertions, 31 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 91b0bc98f..07ce56145 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -107,7 +107,7 @@ func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.Full } if _, found := store.dbs[bucket]; !found { - if err = store.CreateTable(ctx, bucket); err != nil { + if err = store.CreateTable(ctx, bucket); err == nil { store.dbs[bucket] = true } } diff --git a/weed/filer/entry.go b/weed/filer/entry.go index dbe10c9b1..b7c8370e6 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -18,6 +18,7 @@ type Attr struct { Replication string // replication Collection string // collection name TtlSec int32 // ttl in seconds + DiskType string UserName string GroupNames []string SymlinkTarget string diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index 1693b551e..4c613f068 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -56,6 +56,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes { Collection: entry.Attr.Collection, Replication: entry.Attr.Replication, TtlSec: entry.Attr.TtlSec, + DiskType: entry.Attr.DiskType, UserName: entry.Attr.UserName, GroupName: entry.Attr.GroupNames, SymlinkTarget: entry.Attr.SymlinkTarget, @@ -81,6 +82,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t.Collection = attr.Collection t.Replication = attr.Replication t.TtlSec = attr.TtlSec + t.DiskType = attr.DiskType t.UserName = attr.UserName t.GroupNames = attr.GroupName t.SymlinkTarget = attr.SymlinkTarget diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 845bfaec1..99a62c90c 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { buffer.Write(data) }) if !shouldRetry { diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 18ed37abd..b4f0e5890 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -116,7 +116,7 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) { a.Collection = util.Nvl(b.Collection, a.Collection) a.Replication = util.Nvl(b.Replication, a.Replication) a.Ttl = util.Nvl(b.Ttl, a.Ttl) - if b.DiskType != filer_pb.FilerConf_PathConf_NONE { + if b.DiskType != "" { a.DiskType = b.DiskType } a.Fsync = b.Fsync || a.Fsync diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index f3a795ad0..c461a82b8 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -55,7 +55,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry if notification.Queue != nil { glog.V(3).Infof("notifying entry update %v", fullpath) - notification.Queue.SendMessage(fullpath, eventNotification) + if err := notification.Queue.SendMessage(fullpath, eventNotification); err != nil { + // throw message + glog.Error(err) + } } f.logMetaEvent(ctx, fullpath, eventNotification) diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go index 09c39dd89..d441bbbc9 100644 --- a/weed/filer/filer_notify_append.go +++ b/weed/filer/filer_notify_append.go @@ -56,7 +56,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi WritableVolumeCount: rule.VolumeGrowthCount, } - assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest) if err != nil { return nil, nil, fmt.Errorf("AssignVolume: %v", err) } diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go index 057484c37..4213cf965 100644 --- a/weed/filer/mysql/mysql_sql_gen.go +++ b/weed/filer/mysql/mysql_sql_gen.go @@ -16,31 +16,31 @@ var ( ) func (gen *SqlGenMysql) GetSqlInsert(bucket string) string { - return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket) + return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket) } func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string { - return fmt.Sprintf("UPDATE %s SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket) + return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket) } func (gen *SqlGenMysql) GetSqlFind(bucket string) string { - return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket) + return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket) } func (gen *SqlGenMysql) GetSqlDelete(bucket string) string { - return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket) + return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket) } func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string { - return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND directory=?", bucket) + return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", bucket) } func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string { - return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket) + return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket) } func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string { - return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket) + return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket) } func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string { diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go index 686628740..501ab1d39 100644 --- a/weed/filer/mysql/mysql_store.go +++ b/weed/filer/mysql/mysql_store.go @@ -47,12 +47,14 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d store.SupportBucketTable = false store.SqlGenerator = &SqlGenMysql{ CreateTableSqlTemplate: "", - DropTableSqlTemplate: "drop table %s", + DropTableSqlTemplate: "drop table `%s`", } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) + adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database) if interpolateParams { sqlUrl += "&interpolateParams=true" + adaptedSqlUrl += "&interpolateParams=true" } var dbErr error @@ -60,7 +62,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d if dbErr != nil { store.DB.Close() store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err) } store.DB.SetMaxIdleConns(maxIdle) diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go index 15216b651..c796cd6aa 100644 --- a/weed/filer/mysql2/mysql2_store.go +++ b/weed/filer/mysql2/mysql2_store.go @@ -50,12 +50,14 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin store.SupportBucketTable = true store.SqlGenerator = &mysql.SqlGenMysql{ CreateTableSqlTemplate: createTable, - DropTableSqlTemplate: "drop table %s", + DropTableSqlTemplate: "drop table `%s`", } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) + adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database) if interpolateParams { sqlUrl += "&interpolateParams=true" + adaptedSqlUrl += "&interpolateParams=true" } var dbErr error @@ -63,7 +65,7 @@ func (store *MysqlStore2) initialize(createTable, user, password, hostname strin if dbErr != nil { store.DB.Close() store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err) } store.DB.SetMaxIdleConns(maxIdle) diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go index 284cf254b..e13070c3d 100644 --- a/weed/filer/postgres/postgres_sql_gen.go +++ b/weed/filer/postgres/postgres_sql_gen.go @@ -17,31 +17,31 @@ var ( ) func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string { - return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)", bucket) + return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, bucket) } func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string { - return fmt.Sprintf("UPDATE %s SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4", bucket) + return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, bucket) } func (gen *SqlGenPostgres) GetSqlFind(bucket string) string { - return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket) + return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket) } func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string { - return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket) + return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket) } func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string { - return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND directory=$2", bucket) + return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, bucket) } func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string { - return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket) + return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket) } func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string { - return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket) + return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket) } func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string { diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go index 27c6278c7..9e4ff7c32 100644 --- a/weed/filer/postgres/postgres_store.go +++ b/weed/filer/postgres/postgres_store.go @@ -3,6 +3,7 @@ package postgres import ( "database/sql" "fmt" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" @@ -37,40 +38,46 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix configuration.GetString(prefix+"sslmode"), configuration.GetInt(prefix+"connection_max_idle"), configuration.GetInt(prefix+"connection_max_open"), + configuration.GetInt(prefix+"connection_max_lifetime_seconds"), ) } -func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) { +func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { store.SupportBucketTable = false store.SqlGenerator = &SqlGenPostgres{ CreateTableSqlTemplate: "", - DropTableSqlTemplate: "drop table %s", + DropTableSqlTemplate: `drop table "%s"`, } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) if user != "" { sqlUrl += " user=" + user } + adaptedSqlUrl := sqlUrl if password != "" { sqlUrl += " password=" + password + adaptedSqlUrl += " password=ADAPTED" } if database != "" { sqlUrl += " dbname=" + database + adaptedSqlUrl += " dbname=" + database } if schema != "" { sqlUrl += " search_path=" + schema + adaptedSqlUrl += " search_path=" + schema } var dbErr error store.DB, dbErr = sql.Open("postgres", sqlUrl) if dbErr != nil { store.DB.Close() store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err) } store.DB.SetMaxIdleConns(maxIdle) store.DB.SetMaxOpenConns(maxOpen) + store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second) if err = store.DB.Ping(); err != nil { return fmt.Errorf("connect to %s error:%v", sqlUrl, err) diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go index 82552376f..92893bf7a 100644 --- a/weed/filer/postgres2/postgres2_store.go +++ b/weed/filer/postgres2/postgres2_store.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" @@ -40,40 +41,46 @@ func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix configuration.GetString(prefix+"sslmode"), configuration.GetInt(prefix+"connection_max_idle"), configuration.GetInt(prefix+"connection_max_open"), + configuration.GetInt(prefix+"connection_max_lifetime_seconds"), ) } -func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) { +func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { store.SupportBucketTable = true store.SqlGenerator = &postgres.SqlGenPostgres{ CreateTableSqlTemplate: createTable, - DropTableSqlTemplate: "drop table %s", + DropTableSqlTemplate: `drop table "%s"`, } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) if user != "" { sqlUrl += " user=" + user } + adaptedSqlUrl := sqlUrl if password != "" { sqlUrl += " password=" + password + adaptedSqlUrl += " password=ADAPTED" } if database != "" { sqlUrl += " dbname=" + database + adaptedSqlUrl += " dbname=" + database } if schema != "" { sqlUrl += " search_path=" + schema + adaptedSqlUrl += " search_path=" + schema } var dbErr error store.DB, dbErr = sql.Open("postgres", sqlUrl) if dbErr != nil { store.DB.Close() store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err) } store.DB.SetMaxIdleConns(maxIdle) store.DB.SetMaxOpenConns(maxOpen) + store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second) if err = store.DB.Ping(); err != nil { return fmt.Errorf("connect to %s error:%v", sqlUrl, err) diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 1f78057ef..7a6da3beb 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) { target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name) - data, _, err := util.Get(target) + data, _, err := util.FastGet(target) return data, err } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index f0042a0ff..075204b79 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { |
