diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/filer.go | 6 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway_buckets.go | 18 | ||||
| -rw-r--r-- | weed/command/s3.go | 4 | ||||
| -rw-r--r-- | weed/command/scaffold/filer.toml | 3 | ||||
| -rw-r--r-- | weed/command/scaffold/security.toml | 5 | ||||
| -rw-r--r-- | weed/command/server.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer.go | 3 | ||||
| -rw-r--r-- | weed/filer/mysql/mysql_store.go | 25 | ||||
| -rw-r--r-- | weed/s3api/s3api_handlers.go | 4 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 38 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 10 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers.go | 46 |
12 files changed, 139 insertions, 25 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go index fe0beb5b8..50fc4492b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -61,6 +61,7 @@ type FilerOptions struct { showUIDirectoryDelete *bool downloadMaxMBps *int diskType *string + allowedOrigins *string } func init() { @@ -91,6 +92,7 @@ func init() { f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button") f.downloadMaxMBps = cmdFiler.Flag.Int("downloadMaxMBps", 0, "download max speed for each download request, in MB per second") f.diskType = cmdFiler.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") + f.allowedOrigins = cmdFiler.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -229,6 +231,9 @@ func (fo *FilerOptions) startFiler() { if *fo.bindIp == "" { *fo.bindIp = *fo.ip } + if *fo.allowedOrigins == "" { + *fo.allowedOrigins = "*" + } defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2") @@ -253,6 +258,7 @@ func (fo *FilerOptions) startFiler() { ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024, DiskType: *fo.diskType, + AllowedOrigins: strings.Split(*fo.allowedOrigins, ","), }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 9694a1c9c..912607847 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -30,10 +30,20 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo return err } - processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { - lastTime := time.Unix(0, lastTsNs) - glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) + processor := NewMetadataProcessor(eachEntryFunc, 128) + + var lastLogTsNs = time.Now().UnixNano() + processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { + processor.AddSyncJob(resp) + return nil + }, 3*time.Second, func(counter int64, lastTsNs int64) error { + if processor.processedTsWatermark == 0 { + return nil + } + now := time.Now().UnixNano() + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + lastLogTsNs = now + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, processor.processedTsWatermark) }) lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) diff --git a/weed/command/s3.go b/weed/command/s3.go index dc943b23d..b7bb2a546 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "runtime" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -42,6 +43,7 @@ type S3Options struct { portGrpc *int config *string domainName *string + allowedOrigins *string tlsPrivateKey *string tlsCertificate *string tlsCACertificate *string @@ -64,6 +66,7 @@ func init() { s3StandaloneOptions.portHttps = cmdS3.Flag.Int("port.https", 0, "s3 server https listen port") s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port") s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") + s3StandaloneOptions.allowedOrigins = cmdS3.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins") s3StandaloneOptions.dataCenter = cmdS3.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center") s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file") s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file") @@ -220,6 +223,7 @@ func (s3opt *S3Options) startS3Server() bool { Port: *s3opt.port, Config: *s3opt.config, DomainName: *s3opt.domainName, + AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","), BucketsPath: filerBucketsPath, GrpcDialOption: grpcDialOption, AllowEmptyFolder: *s3opt.allowEmptyFolder, diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 55876bea0..231e7510a 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -51,6 +51,9 @@ dbFile = "./filer.db" # sqlite db file # ) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; enabled = false +# dsn will take priority over "hostname, port, username, password, database". +# [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] +dsn = "root@tcp(localhost:3306)/seaweedfs?collation=utf8mb4_bin" hostname = "localhost" port = 3306 username = "root" diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml index e5452cdff..9626ee58c 100644 --- a/weed/command/scaffold/security.toml +++ b/weed/command/scaffold/security.toml @@ -4,6 +4,11 @@ # /etc/seaweedfs/security.toml # this file is read by master, volume server, and filer +# comma separated origins allowed to make requests to the filer and s3 gateway. +# enter in this format: https://domain.com, or http://localhost:port +[cors.allowed_origins] +values = "*" + # this jwt signing key is read by master and volume server, and it is used for write operations: # - the Master server generates the JWT, which can be used to write a certain file on a volume server # - the Volume server validates the JWT on writing diff --git a/weed/command/server.go b/weed/command/server.go index 67e37426e..9631f6bfd 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -106,6 +106,7 @@ func init() { filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port") filerOptions.portGrpc = cmdServer.Flag.Int("filer.port.grpc", 0, "filer server grpc listen port") filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port") + filerOptions.allowedOrigins = cmdServer.Flag.String("filer.allowedOrigins", "*", "comma separated list of allowed origins") filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.") filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing") filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 4, "split files larger than the limit") @@ -142,6 +143,7 @@ func init() { s3Options.portHttps = cmdServer.Flag.Int("s3.port.https", 0, "s3 server https listen port") s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") + s3Options.allowedOrigins = cmdServer.Flag.String("s3.allowedOrigins", "*", "comma separated list of allowed origins") s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file") s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file") s3Options.tlsCACertificate = cmdServer.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file") diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 239263ca8..1c6b3c338 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,12 +3,13 @@ package filer import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "os" "sort" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go index 14566d49b..f1a246575 100644 --- a/weed/filer/mysql/mysql_store.go +++ b/weed/filer/mysql/mysql_store.go @@ -3,6 +3,8 @@ package mysql import ( "database/sql" "fmt" + "github.com/go-sql-driver/mysql" + "strings" "time" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -30,6 +32,7 @@ func (store *MysqlStore) GetName() string { func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( + configuration.GetString(prefix+"dsn"), configuration.GetString(prefix+"upsertQuery"), configuration.GetBool(prefix+"enableUpsert"), configuration.GetString(prefix+"username"), @@ -44,7 +47,7 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str ) } -func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen, +func (store *MysqlStore) initialize(dsn string, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen, maxLifetimeSeconds int, interpolateParams bool) (err error) { store.SupportBucketTable = false @@ -57,19 +60,23 @@ func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user, UpsertQueryTemplate: upsertQuery, } - sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) - adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database) - if interpolateParams { - sqlUrl += "&interpolateParams=true" - adaptedSqlUrl += "&interpolateParams=true" + if dsn == "" { + dsn = fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) + if interpolateParams { + dsn += "&interpolateParams=true" + } + } + cfg, err := mysql.ParseDSN(dsn) + if err != nil { + return fmt.Errorf("can not parse DSN error:%v", err) } var dbErr error - store.DB, dbErr = sql.Open("mysql", sqlUrl) + store.DB, dbErr = sql.Open("mysql", dsn) if dbErr != nil { store.DB.Close() store.DB = nil - return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err) + return fmt.Errorf("can not connect to %s error:%v", strings.ReplaceAll(dsn, cfg.Passwd, "<ADAPTED>"), err) } store.DB.SetMaxIdleConns(maxIdle) @@ -77,7 +84,7 @@ func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user, store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second) if err = store.DB.Ping(); err != nil { - return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + return fmt.Errorf("connect to %s error:%v", strings.ReplaceAll(dsn, cfg.Passwd, "<ADAPTED>"), err) } return nil diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 81d7017dc..c146a8b15 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -40,6 +40,10 @@ func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) { s3err.WriteEmptyResponse(w, r, http.StatusOK) } +func writeFailureResponse(w http.ResponseWriter, r *http.Request, errCode s3err.ErrorCode) { + s3err.WriteErrorResponse(w, r, errCode) +} + func validateContentMd5(h http.Header) ([]byte, error) { md5B64, ok := h["Content-Md5"] if ok { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index e90c334aa..37ae54f1b 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -3,15 +3,16 @@ package s3api import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" - "github.com/seaweedfs/seaweedfs/weed/util/grace" "net" "net/http" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" + "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/pb" . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" @@ -26,6 +27,7 @@ type S3ApiServerOption struct { Port int Config string DomainName string + AllowedOrigins []string BucketsPath string GrpcDialOption grpc.DialOption AllowEmptyFolder bool @@ -56,6 +58,14 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + v.SetDefault("cors.allowed_origins.values", "*") + + if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) { + allowedOrigins := v.GetString("cors.allowed_origins.values") + domains := strings.Split(allowedOrigins, ",") + option.AllowedOrigins = domains + } + s3ApiServer = &S3ApiServer{ option: option, iam: NewIdentityAccessManagement(option), @@ -103,7 +113,25 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { apiRouter.Methods("OPTIONS").HandlerFunc( func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") + origin := r.Header.Get("Origin") + if origin != "" { + if s3a.option.AllowedOrigins == nil || len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" { + origin = "*" + } else { + originFound := false + for _, allowedOrigin := range s3a.option.AllowedOrigins { + if origin == allowedOrigin { + originFound = true + } + } + if !originFound { + writeFailureResponse(w, r, http.StatusForbidden) + return + } + } + } + + w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Expose-Headers", "*") w.Header().Set("Access-Control-Allow-Methods", "*") w.Header().Set("Access-Control-Allow-Headers", "*") diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 1b50d47c7..20b5151cd 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "strings" "sync" "time" @@ -70,6 +71,7 @@ type FilerOption struct { ShowUIDirectoryDelete bool DownloadMaxBytesPs int64 DiskType string + AllowedOrigins []string } type FilerServer struct { @@ -107,6 +109,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + v.SetDefault("cors.allowed_origins.values", "*") + + if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) { + allowedOrigins := v.GetString("cors.allowed_origins.values") + domains := strings.Split(allowedOrigins, ",") + option.AllowedOrigins = domains + } + fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 6bfae3dc1..d71b60d70 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -3,6 +3,7 @@ package weed_server import ( "errors" "net/http" + "os" "strings" "sync/atomic" "time" @@ -17,8 +18,24 @@ import ( func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() - if r.Header.Get("Origin") != "" { - w.Header().Set("Access-Control-Allow-Origin", "*") + origin := r.Header.Get("Origin") + if origin != "" { + if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" { + origin = "*" + } else { + originFound := false + for _, allowedOrigin := range fs.option.AllowedOrigins { + if origin == allowedOrigin { + originFound = true + } + } + if !originFound { + writeJsonError(w, r, http.StatusForbidden, errors.New("origin not allowed")) + return + } + } + + w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Expose-Headers", "*") w.Header().Set("Access-Control-Allow-Headers", "*") w.Header().Set("Access-Control-Allow-Credentials", "true") @@ -99,9 +116,27 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque start := time.Now() - if r.Header.Get("Origin") != "" { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Headers", "*") + os.Stdout.WriteString("Request: " + r.Method + " " + r.URL.String() + "\n") + + origin := r.Header.Get("Origin") + if origin != "" { + if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" { + origin = "*" + } else { + originFound := false + for _, allowedOrigin := range fs.option.AllowedOrigins { + if origin == allowedOrigin { + originFound = true + } + } + if !originFound { + writeJsonError(w, r, http.StatusForbidden, errors.New("origin not allowed")) + return + } + } + + w.Header().Set("Access-Control-Allow-Origin", origin) + w.Header().Set("Access-Control-Allow-Headers", "OPTIONS, GET, HEAD") w.Header().Set("Access-Control-Allow-Credentials", "true") } @@ -137,7 +172,6 @@ func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) { w.Header().Set("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") w.Header().Set("Access-Control-Expose-Headers", "*") } - w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "*") w.Header().Set("Access-Control-Allow-Credentials", "true") } |
