aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer.go6
-rw-r--r--weed/command/filer_remote_gateway_buckets.go18
-rw-r--r--weed/command/s3.go4
-rw-r--r--weed/command/scaffold/filer.toml3
-rw-r--r--weed/command/scaffold/security.toml5
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/filer/filer.go3
-rw-r--r--weed/filer/mysql/mysql_store.go25
-rw-r--r--weed/s3api/s3api_handlers.go4
-rw-r--r--weed/s3api/s3api_server.go38
-rw-r--r--weed/server/filer_server.go10
-rw-r--r--weed/server/filer_server_handlers.go46
12 files changed, 139 insertions, 25 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index fe0beb5b8..50fc4492b 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -61,6 +61,7 @@ type FilerOptions struct {
showUIDirectoryDelete *bool
downloadMaxMBps *int
diskType *string
+ allowedOrigins *string
}
func init() {
@@ -91,6 +92,7 @@ func init() {
f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
f.downloadMaxMBps = cmdFiler.Flag.Int("downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
f.diskType = cmdFiler.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
+ f.allowedOrigins = cmdFiler.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -229,6 +231,9 @@ func (fo *FilerOptions) startFiler() {
if *fo.bindIp == "" {
*fo.bindIp = *fo.ip
}
+ if *fo.allowedOrigins == "" {
+ *fo.allowedOrigins = "*"
+ }
defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
@@ -253,6 +258,7 @@ func (fo *FilerOptions) startFiler() {
ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
DiskType: *fo.diskType,
+ AllowedOrigins: strings.Split(*fo.allowedOrigins, ","),
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index 9694a1c9c..912607847 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -30,10 +30,20 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
return err
}
- processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
- lastTime := time.Unix(0, lastTsNs)
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
+ processor := NewMetadataProcessor(eachEntryFunc, 128)
+
+ var lastLogTsNs = time.Now().UnixNano()
+ processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
+ processor.AddSyncJob(resp)
+ return nil
+ }, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ if processor.processedTsWatermark == 0 {
+ return nil
+ }
+ now := time.Now().UnixNano()
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
+ lastLogTsNs = now
+ return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, processor.processedTsWatermark)
})
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index dc943b23d..b7bb2a546 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -10,6 +10,7 @@ import (
"net/http"
"os"
"runtime"
+ "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
@@ -42,6 +43,7 @@ type S3Options struct {
portGrpc *int
config *string
domainName *string
+ allowedOrigins *string
tlsPrivateKey *string
tlsCertificate *string
tlsCACertificate *string
@@ -64,6 +66,7 @@ func init() {
s3StandaloneOptions.portHttps = cmdS3.Flag.Int("port.https", 0, "s3 server https listen port")
s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
+ s3StandaloneOptions.allowedOrigins = cmdS3.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins")
s3StandaloneOptions.dataCenter = cmdS3.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file")
@@ -220,6 +223,7 @@ func (s3opt *S3Options) startS3Server() bool {
Port: *s3opt.port,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
+ AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","),
BucketsPath: filerBucketsPath,
GrpcDialOption: grpcDialOption,
AllowEmptyFolder: *s3opt.allowEmptyFolder,
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 55876bea0..231e7510a 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -51,6 +51,9 @@ dbFile = "./filer.db" # sqlite db file
# ) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
enabled = false
+# dsn will take priority over "hostname, port, username, password, database".
+# [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
+dsn = "root@tcp(localhost:3306)/seaweedfs?collation=utf8mb4_bin"
hostname = "localhost"
port = 3306
username = "root"
diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml
index e5452cdff..9626ee58c 100644
--- a/weed/command/scaffold/security.toml
+++ b/weed/command/scaffold/security.toml
@@ -4,6 +4,11 @@
# /etc/seaweedfs/security.toml
# this file is read by master, volume server, and filer
+# comma separated origins allowed to make requests to the filer and s3 gateway.
+# enter in this format: https://domain.com, or http://localhost:port
+[cors.allowed_origins]
+values = "*"
+
# this jwt signing key is read by master and volume server, and it is used for write operations:
# - the Master server generates the JWT, which can be used to write a certain file on a volume server
# - the Volume server validates the JWT on writing
diff --git a/weed/command/server.go b/weed/command/server.go
index 67e37426e..9631f6bfd 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -106,6 +106,7 @@ func init() {
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
filerOptions.portGrpc = cmdServer.Flag.Int("filer.port.grpc", 0, "filer server grpc listen port")
filerOptions.publicPort = cmdServer.Flag.Int("filer.port.public", 0, "filer server public http listen port")
+ filerOptions.allowedOrigins = cmdServer.Flag.String("filer.allowedOrigins", "*", "comma separated list of allowed origins")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 4, "split files larger than the limit")
@@ -142,6 +143,7 @@ func init() {
s3Options.portHttps = cmdServer.Flag.Int("s3.port.https", 0, "s3 server https listen port")
s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
+ s3Options.allowedOrigins = cmdServer.Flag.String("s3.allowedOrigins", "*", "comma separated list of allowed origins")
s3Options.tlsPrivateKey = cmdServer.Flag.String("s3.key.file", "", "path to the TLS private key file")
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.tlsCACertificate = cmdServer.Flag.String("s3.cacert.file", "", "path to the TLS CA certificate file")
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 239263ca8..1c6b3c338 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -3,12 +3,13 @@ package filer
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"os"
"sort"
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
+
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
index 14566d49b..f1a246575 100644
--- a/weed/filer/mysql/mysql_store.go
+++ b/weed/filer/mysql/mysql_store.go
@@ -3,6 +3,8 @@ package mysql
import (
"database/sql"
"fmt"
+ "github.com/go-sql-driver/mysql"
+ "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -30,6 +32,7 @@ func (store *MysqlStore) GetName() string {
func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
+ configuration.GetString(prefix+"dsn"),
configuration.GetString(prefix+"upsertQuery"),
configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
@@ -44,7 +47,7 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
)
}
-func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen,
+func (store *MysqlStore) initialize(dsn string, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen,
maxLifetimeSeconds int, interpolateParams bool) (err error) {
store.SupportBucketTable = false
@@ -57,19 +60,23 @@ func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user,
UpsertQueryTemplate: upsertQuery,
}
- sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
- adaptedSqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, "<ADAPTED>", hostname, port, database)
- if interpolateParams {
- sqlUrl += "&interpolateParams=true"
- adaptedSqlUrl += "&interpolateParams=true"
+ if dsn == "" {
+ dsn = fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ if interpolateParams {
+ dsn += "&interpolateParams=true"
+ }
+ }
+ cfg, err := mysql.ParseDSN(dsn)
+ if err != nil {
+ return fmt.Errorf("can not parse DSN error:%v", err)
}
var dbErr error
- store.DB, dbErr = sql.Open("mysql", sqlUrl)
+ store.DB, dbErr = sql.Open("mysql", dsn)
if dbErr != nil {
store.DB.Close()
store.DB = nil
- return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, err)
+ return fmt.Errorf("can not connect to %s error:%v", strings.ReplaceAll(dsn, cfg.Passwd, "<ADAPTED>"), err)
}
store.DB.SetMaxIdleConns(maxIdle)
@@ -77,7 +84,7 @@ func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user,
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
- return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ return fmt.Errorf("connect to %s error:%v", strings.ReplaceAll(dsn, cfg.Passwd, "<ADAPTED>"), err)
}
return nil
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 81d7017dc..c146a8b15 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -40,6 +40,10 @@ func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) {
s3err.WriteEmptyResponse(w, r, http.StatusOK)
}
+func writeFailureResponse(w http.ResponseWriter, r *http.Request, errCode s3err.ErrorCode) {
+ s3err.WriteErrorResponse(w, r, errCode)
+}
+
func validateContentMd5(h http.Header) ([]byte, error) {
md5B64, ok := h["Content-Md5"]
if ok {
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index e90c334aa..37ae54f1b 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -3,15 +3,16 @@ package s3api
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/grace"
"net"
"net/http"
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/grace"
+
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/pb"
. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
@@ -26,6 +27,7 @@ type S3ApiServerOption struct {
Port int
Config string
DomainName string
+ AllowedOrigins []string
BucketsPath string
GrpcDialOption grpc.DialOption
AllowEmptyFolder bool
@@ -56,6 +58,14 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
+ v.SetDefault("cors.allowed_origins.values", "*")
+
+ if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) {
+ allowedOrigins := v.GetString("cors.allowed_origins.values")
+ domains := strings.Split(allowedOrigins, ",")
+ option.AllowedOrigins = domains
+ }
+
s3ApiServer = &S3ApiServer{
option: option,
iam: NewIdentityAccessManagement(option),
@@ -103,7 +113,25 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
apiRouter.Methods("OPTIONS").HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Access-Control-Allow-Origin", "*")
+ origin := r.Header.Get("Origin")
+ if origin != "" {
+ if s3a.option.AllowedOrigins == nil || len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" {
+ origin = "*"
+ } else {
+ originFound := false
+ for _, allowedOrigin := range s3a.option.AllowedOrigins {
+ if origin == allowedOrigin {
+ originFound = true
+ }
+ }
+ if !originFound {
+ writeFailureResponse(w, r, http.StatusForbidden)
+ return
+ }
+ }
+ }
+
+ w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Expose-Headers", "*")
w.Header().Set("Access-Control-Allow-Methods", "*")
w.Header().Set("Access-Control-Allow-Headers", "*")
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 1b50d47c7..20b5151cd 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
+ "strings"
"sync"
"time"
@@ -70,6 +71,7 @@ type FilerOption struct {
ShowUIDirectoryDelete bool
DownloadMaxBytesPs int64
DiskType string
+ AllowedOrigins []string
}
type FilerServer struct {
@@ -107,6 +109,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
+ v.SetDefault("cors.allowed_origins.values", "*")
+
+ if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) {
+ allowedOrigins := v.GetString("cors.allowed_origins.values")
+ domains := strings.Split(allowedOrigins, ",")
+ option.AllowedOrigins = domains
+ }
+
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 6bfae3dc1..d71b60d70 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -3,6 +3,7 @@ package weed_server
import (
"errors"
"net/http"
+ "os"
"strings"
"sync/atomic"
"time"
@@ -17,8 +18,24 @@ import (
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now()
- if r.Header.Get("Origin") != "" {
- w.Header().Set("Access-Control-Allow-Origin", "*")
+ origin := r.Header.Get("Origin")
+ if origin != "" {
+ if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" {
+ origin = "*"
+ } else {
+ originFound := false
+ for _, allowedOrigin := range fs.option.AllowedOrigins {
+ if origin == allowedOrigin {
+ originFound = true
+ }
+ }
+ if !originFound {
+ writeJsonError(w, r, http.StatusForbidden, errors.New("origin not allowed"))
+ return
+ }
+ }
+
+ w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Expose-Headers", "*")
w.Header().Set("Access-Control-Allow-Headers", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
@@ -99,9 +116,27 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
start := time.Now()
- if r.Header.Get("Origin") != "" {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Headers", "*")
+ os.Stdout.WriteString("Request: " + r.Method + " " + r.URL.String() + "\n")
+
+ origin := r.Header.Get("Origin")
+ if origin != "" {
+ if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" {
+ origin = "*"
+ } else {
+ originFound := false
+ for _, allowedOrigin := range fs.option.AllowedOrigins {
+ if origin == allowedOrigin {
+ originFound = true
+ }
+ }
+ if !originFound {
+ writeJsonError(w, r, http.StatusForbidden, errors.New("origin not allowed"))
+ return
+ }
+ }
+
+ w.Header().Set("Access-Control-Allow-Origin", origin)
+ w.Header().Set("Access-Control-Allow-Headers", "OPTIONS, GET, HEAD")
w.Header().Set("Access-Control-Allow-Credentials", "true")
}
@@ -137,7 +172,6 @@ func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
w.Header().Set("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
w.Header().Set("Access-Control-Expose-Headers", "*")
}
- w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
}