aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/cluster/cluster.go10
-rw-r--r--weed/cluster/cluster_test.go34
-rw-r--r--weed/command/command.go4
-rw-r--r--weed/command/filer.go26
-rw-r--r--weed/command/filer_sync.go4
-rw-r--r--weed/command/imports.go2
-rw-r--r--weed/command/master.go5
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/scaffold/filer.toml10
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/update.go382
-rw-r--r--weed/command/update_full.go9
-rw-r--r--weed/command/volume.go5
-rw-r--r--weed/filer/filechunk_manifest.go7
-rw-r--r--weed/filer/filechunks.go8
-rw-r--r--weed/filer/s3iam_conf.go3
-rw-r--r--weed/filer/tikv/tikv.go6
-rw-r--r--weed/filer/tikv/tikv_store.go396
-rw-r--r--weed/filer/tikv/tikv_store_kv.go50
-rw-r--r--weed/mount/inode_to_path.go15
-rw-r--r--weed/mount/weedfs_dir_mkrm.go2
-rw-r--r--weed/pb/filer_pb/signature.go13
-rw-r--r--weed/pb/grpc_client_server.go5
-rw-r--r--weed/pb/remote.proto5
-rw-r--r--weed/pb/s3.proto10
-rw-r--r--weed/pb/s3_pb/s3.pb.go221
-rw-r--r--weed/remote_storage/hdfs/doc.go9
-rw-r--r--weed/remote_storage/hdfs/hdfs_kerberos.go58
-rw-r--r--weed/remote_storage/hdfs/hdfs_storage_client.go194
-rw-r--r--weed/s3api/auth_credentials_subscribe.go33
-rw-r--r--weed/s3api/s3_constants/s3_config.go18
-rw-r--r--weed/s3api/s3api_circuit_breaker.go183
-rw-r--r--weed/s3api/s3api_circuit_breaker_test.go107
-rw-r--r--weed/s3api/s3api_object_copy_handlers.go8
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go17
-rw-r--r--weed/s3api/s3api_server.go84
-rw-r--r--weed/s3api/s3err/s3api_errors.go13
-rw-r--r--weed/s3api/stats.go6
-rw-r--r--weed/security/tls.go140
-rw-r--r--weed/sequence/snowflake_sequencer_test.go25
-rw-r--r--weed/server/common.go4
-rw-r--r--weed/server/master_grpc_server.go8
-rw-r--r--weed/server/master_grpc_server_volume.go10
-rw-r--r--weed/server/master_server_handlers_admin.go5
-rw-r--r--weed/server/raft_hashicorp.go5
-rw-r--r--weed/server/raft_server.go2
-rw-r--r--weed/server/volume_grpc_copy.go24
-rw-r--r--weed/server/volume_server.go6
-rw-r--r--weed/server/volume_server_handlers.go28
-rw-r--r--weed/server/webdav_server.go8
-rw-r--r--weed/shell/command_remote_configure.go21
-rw-r--r--weed/shell/command_s3_circuitbreaker.go358
-rw-r--r--weed/shell/command_s3_circuitbreaker_test.go292
-rw-r--r--weed/stats/metrics.go5
-rw-r--r--weed/storage/disk_location.go7
-rw-r--r--weed/storage/needle_map/compact_map.go2
-rw-r--r--weed/storage/needle_map/compact_map_test.go15
-rw-r--r--weed/topology/topology_vacuum.go6
-rw-r--r--weed/topology/volume_growth.go39
-rw-r--r--weed/topology/volume_layout.go4
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/wdclient/masterclient.go56
-rw-r--r--weed/wdclient/vid_map.go6
63 files changed, 2518 insertions, 525 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index 6c24df44c..ad6e6b879 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -46,8 +46,6 @@ func NewCluster() *Cluster {
}
func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
- cluster.filersLock.Lock()
- defer cluster.filersLock.Unlock()
filers, found := cluster.filerGroup2filers[filerGroup]
if !found && createIfNotFound {
filers = &Filers{
@@ -63,6 +61,8 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
+ cluster.filersLock.Lock()
+ defer cluster.filersLock.Unlock()
filers := cluster.getFilers(filerGroup, true)
if existingNode, found := filers.filers[address]; found {
existingNode.counter++
@@ -115,6 +115,8 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
+ cluster.filersLock.Lock()
+ defer cluster.filersLock.Unlock()
filers := cluster.getFilers(filerGroup, false)
if filers == nil {
return nil
@@ -165,12 +167,12 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
switch nodeType {
case FilerType:
+ cluster.filersLock.RLock()
+ defer cluster.filersLock.RUnlock()
filers := cluster.getFilers(filerGroup, false)
if filers == nil {
return
}
- cluster.filersLock.RLock()
- defer cluster.filersLock.RUnlock()
for _, node := range filers.filers {
nodes = append(nodes, node)
}
diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go
index ccaccf6f7..1187642de 100644
--- a/weed/cluster/cluster_test.go
+++ b/weed/cluster/cluster_test.go
@@ -3,6 +3,8 @@ package cluster
import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/stretchr/testify/assert"
+ "strconv"
+ "sync"
"testing"
)
@@ -45,3 +47,35 @@ func TestClusterAddRemoveNodes(t *testing.T) {
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
}
+
+func TestConcurrentAddRemoveNodes(t *testing.T) {
+ c := NewCluster()
+ var wg sync.WaitGroup
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ address := strconv.Itoa(i)
+ c.AddClusterNode("", "filer", pb.ServerAddress(address), "23.45")
+ }(i)
+ }
+ wg.Wait()
+
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ address := strconv.Itoa(i)
+ node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address))
+
+ if len(node) == 0 {
+ t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address)
+ return
+ } else if node[0].ClusterNodeUpdate.Address != address {
+ t.Errorf("TestConcurrentAddRemoveNodes: expect:%s, actual:%s", address, node[0].ClusterNodeUpdate.Address)
+ return
+ }
+ }(i)
+ }
+ wg.Wait()
+}
diff --git a/weed/command/command.go b/weed/command/command.go
index dbc18a053..7635405dc 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -2,9 +2,10 @@ package command
import (
"fmt"
- flag "github.com/chrislusf/seaweedfs/weed/util/fla9"
"os"
"strings"
+
+ flag "github.com/chrislusf/seaweedfs/weed/util/fla9"
)
var Commands = []*Command{
@@ -36,6 +37,7 @@ var Commands = []*Command{
cmdScaffold,
cmdServer,
cmdShell,
+ cmdUpdate,
cmdUpload,
cmdVersion,
cmdVolume,
diff --git a/weed/command/filer.go b/weed/command/filer.go
index c9f9a1956..7e0e92d4a 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -6,10 +6,13 @@ import (
"net/http"
"os"
"runtime"
+ "sort"
+ "strings"
"time"
"google.golang.org/grpc/reflection"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -114,10 +117,8 @@ func init() {
filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
}
-var cmdFiler = &Command{
- UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
- Short: "start a file server that points to a master server, or a list of master servers",
- Long: `start a file server which accepts REST operation for any files.
+func filerLongDesc() string {
+ desc := `start a file server which accepts REST operation for any files.
//create or overwrite the file, the directories /path/to will be automatically created
POST /path/to/file
@@ -133,7 +134,22 @@ var cmdFiler = &Command{
The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
-`,
+Supported Filer Stores:
+`
+
+ storeNames := make([]string, len(filer.Stores))
+ for i, store := range filer.Stores {
+ storeNames[i] = "\t" + store.GetName()
+ }
+ sort.Strings(storeNames)
+ storeList := strings.Join(storeNames, "\n")
+ return desc + storeList
+}
+
+var cmdFiler = &Command{
+ UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
+ Short: "start a file server that points to a master server, or a list of master servers",
+ Long: filerLongDesc(),
}
func runFiler(cmd *Command, args []string) bool {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index b7da1baf9..1550d155a 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -215,10 +215,10 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
return persistEventFn(resp)
}
- var lastLogTsNs = time.Now().Nanosecond()
+ var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
- now := time.Now().Nanosecond()
+ now := time.Now().UnixNano()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
// collect synchronous offset
diff --git a/weed/command/imports.go b/weed/command/imports.go
index 04079b162..afdbc5a10 100644
--- a/weed/command/imports.go
+++ b/weed/command/imports.go
@@ -5,7 +5,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs"
- _ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
@@ -32,5 +31,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
_ "github.com/chrislusf/seaweedfs/weed/filer/ydb"
)
diff --git a/weed/command/master.go b/weed/command/master.go
index 9587df055..ab8466d47 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,9 +1,11 @@
package command
import (
+ "fmt"
"golang.org/x/exp/slices"
"net/http"
"os"
+ "path"
"strings"
"time"
@@ -151,11 +153,12 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
// start raftServer
+ metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port))
raftServerOption := &weed_server.RaftServerOption{
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
Peers: masterPeers,
ServerAddr: myMasterAddress,
- DataDir: util.ResolvePath(*masterOption.metaFolder),
+ DataDir: util.ResolvePath(metaDir),
Topo: ms.Topo,
RaftResumeState: *masterOption.raftResumeState,
HeartbeatInterval: *masterOption.heartbeatInterval,
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 0e32a53e8..0046ca03d 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -86,7 +86,7 @@ var cmdMount = &Command{
This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on
Linux, and OS X.
- On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
+ On OS X, it requires OSXFUSE (https://osxfuse.github.io/).
`,
}
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 595fb2e62..c82de8da0 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -327,3 +327,13 @@ location = "/tmp/"
address = "localhost:6379"
password = ""
database = 1
+
+[tikv]
+enabled = false
+# If you have many pd address, use ',' split then:
+# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379"
+pdaddrs = "localhost:2379"
+# Concurrency for TiKV delete range
+deleterange_concurrency = 1
+# Enable 1PC
+enable_1pc = false
diff --git a/weed/command/server.go b/weed/command/server.go
index ba71a44bd..b1812bb9b 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -132,6 +132,7 @@ func init() {
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port")
+ serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port")
diff --git a/weed/command/update.go b/weed/command/update.go
new file mode 100644
index 000000000..2d0dc42ad
--- /dev/null
+++ b/weed/command/update.go
@@ -0,0 +1,382 @@
+package command
+
+import (
+ "archive/tar"
+ "archive/zip"
+ "bytes"
+ "compress/gzip"
+ "context"
+ "crypto/md5"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "path/filepath"
+ "runtime"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "golang.org/x/net/context/ctxhttp"
+)
+
+//copied from https://github.com/restic/restic/tree/master/internal/selfupdate
+
+// Release collects data about a single release on GitHub.
+type Release struct {
+ Name string `json:"name"`
+ TagName string `json:"tag_name"`
+ Draft bool `json:"draft"`
+ PreRelease bool `json:"prerelease"`
+ PublishedAt time.Time `json:"published_at"`
+ Assets []Asset `json:"assets"`
+
+ Version string `json:"-"` // set manually in the code
+}
+
+// Asset is a file uploaded and attached to a release.
+type Asset struct {
+ ID int `json:"id"`
+ Name string `json:"name"`
+ URL string `json:"url"`
+}
+
+const githubAPITimeout = 30 * time.Second
+
+// githubError is returned by the GitHub API, e.g. for rate-limiting.
+type githubError struct {
+ Message string
+}
+
+//default version is not full version
+var isFullVersion = false
+
+var (
+ updateOpt UpdateOptions
+)
+
+type UpdateOptions struct {
+ dir *string
+ name *string
+ Version *string
+}
+
+func init() {
+ path, _ := os.Executable()
+ _, name := filepath.Split(path)
+ updateOpt.dir = cmdUpdate.Flag.String("dir", filepath.Dir(path), "directory to save new weed.")
+ updateOpt.name = cmdUpdate.Flag.String("name", name, "name of new weed. On windows, name shouldn't be same to the orignial name.")
+ updateOpt.Version = cmdUpdate.Flag.String("version", "0", "specific version of weed you want to download. If not specified, get the latest version.")
+ cmdUpdate.Run = runUpdate
+}
+
+var cmdUpdate = &Command{
+ UsageLine: "update [-dir=/path/to/dir] [-name=name] [-version=x.xx]",
+ Short: "get latest or specific version from https://github.com/chrislusf/seaweedfs",
+ Long: `get latest or specific version from https://github.com/chrislusf/seaweedfs`,
+}
+
+func runUpdate(cmd *Command, args []string) bool {
+ path, _ := os.Executable()
+ _, name := filepath.Split(path)
+
+ if *updateOpt.dir != "" {
+ if err := util.TestFolderWritable(util.ResolvePath(*updateOpt.dir)); err != nil {
+ glog.Fatalf("Check Folder(-dir) Writable %s : %s", *updateOpt.dir, err)
+ return false
+ }
+ } else {
+ *updateOpt.dir = filepath.Dir(path)
+ }
+
+ if *updateOpt.name == "" {
+ *updateOpt.name = name
+ }
+
+ target := filepath.Join(*updateOpt.dir, *updateOpt.name)
+
+ if runtime.GOOS == "windows" {
+ if target == path {
+ glog.Fatalf("On windows, name of the new weed shouldn't be same to the orignial name.")
+ return false
+ }
+ }
+
+ glog.V(0).Infof("new weed will be saved to %s", target)
+
+ _, err := downloadRelease(context.Background(), target, *updateOpt.Version)
+ if err != nil {
+ glog.Errorf("unable to download weed: %v", err)
+ return false
+ }
+ return true
+}
+
+func downloadRelease(ctx context.Context, target string, ver string) (version string, err error) {
+ currentVersion := util.VERSION_NUMBER
+ rel, err := GitHubLatestRelease(ctx, ver, "chrislusf", "seaweedfs")
+ if err != nil {
+ return "", err
+ }
+
+ if rel.Version == currentVersion {
+ if ver == "0" {
+ glog.V(0).Infof("weed is up to date")
+ } else {
+ glog.V(0).Infof("no need to download the same version of weed ")
+ }
+ return currentVersion, nil
+ }
+
+ glog.V(0).Infof("download version: %s", rel.Version)
+
+ largeDiskSuffix := ""
+ if util.VolumeSizeLimitGB == 8000 {
+ largeDiskSuffix = "_large_disk"
+ }
+
+ fullSuffix := ""
+ if isFullVersion {
+ fullSuffix = "_full"
+ }
+
+ ext := "tar.gz"
+ if runtime.GOOS == "windows" {
+ ext = "zip"
+ }
+
+ suffix := fmt.Sprintf("%s_%s%s%s.%s", runtime.GOOS, runtime.GOARCH, fullSuffix, largeDiskSuffix, ext)
+ md5Filename := fmt.Sprintf("%s.md5", suffix)
+ _, md5Val, err := getGithubDataFile(ctx, rel.Assets, md5Filename)
+ if err != nil {
+ return "", err
+ }
+
+ downloadFilename, buf, err := getGithubDataFile(ctx, rel.Assets, suffix)
+ if err != nil {
+ return "", err
+ }
+
+ md5Ctx := md5.New()
+ md5Ctx.Write(buf)
+ binaryMd5 := md5Ctx.Sum(nil)
+ if hex.EncodeToString(binaryMd5) != string(md5Val[0:32]) {
+ glog.Errorf("md5:'%s' '%s'", hex.EncodeToString(binaryMd5), string(md5Val[0:32]))
+ err = fmt.Errorf("binary md5sum doesn't match")
+ return "", err
+ }
+
+ err = extractToFile(buf, downloadFilename, target)
+ if err != nil {
+ return "", err
+ } else {
+ glog.V(0).Infof("successfully updated weed to version %v\n", rel.Version)
+ }
+
+ return rel.Version, nil
+}
+
+// GitHubLatestRelease uses the GitHub API to get information about the specific
+// release of a repository.
+func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (Release, error) {
+ ctx, cancel := context.WithTimeout(ctx, githubAPITimeout)
+ defer cancel()
+
+ url := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases", owner, repo)
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ return Release{}, err
+ }
+
+ // pin API version 3
+ req.Header.Set("Accept", "application/vnd.github.v3+json")
+
+ res, err := ctxhttp.Do(ctx, http.DefaultClient, req)
+ if err != nil {
+ return Release{}, err
+ }
+
+ if res.StatusCode != http.StatusOK {
+ content := res.Header.Get("Content-Type")
+ if strings.Contains(content, "application/json") {
+ // try to decode error message
+ var msg githubError
+ jerr := json.NewDecoder(res.Body).Decode(&msg)
+ if jerr == nil {
+ return Release{}, fmt.Errorf("unexpected status %v (%v) returned, message:\n %v", res.StatusCode, res.Status, msg.Message)
+ }
+ }
+
+ _ = res.Body.Close()
+ return Release{}, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
+ }
+
+ buf, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ _ = res.Body.Close()
+ return Release{}, err
+ }
+
+ err = res.Body.Close()
+ if err != nil {
+ return Release{}, err
+ }
+
+ var release Release
+ var releaseList []Release
+ err = json.Unmarshal(buf, &releaseList)
+ if err != nil {
+ return Release{}, err
+ }
+ if ver == "0" {
+ release = releaseList[0]
+ glog.V(0).Infof("latest version is %v\n", release.TagName)
+ } else {
+ for _, r := range releaseList {
+ if r.TagName == ver {
+ release = r
+ break
+ }
+ }
+ }
+
+ if release.TagName == "" {
+ return Release{}, fmt.Errorf("can not find the specific version")
+ }
+
+ release.Version = release.TagName
+ return release, nil
+}
+
+func getGithubData(ctx context.Context, url string) ([]byte, error) {
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ // request binary data
+ req.Header.Set("Accept", "application/octet-stream")
+
+ res, err := ctxhttp.Do(ctx, http.DefaultClient, req)
+ if err != nil {
+ return nil, err
+ }
+
+ if res.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status)
+ }
+
+ buf, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ _ = res.Body.Close()
+ return nil, err
+ }
+
+ err = res.Body.Close()
+ if err != nil {
+ return nil, err
+ }
+
+ return buf, nil
+}
+
+func getGithubDataFile(ctx context.Context, assets []Asset, suffix string) (filename string, data []byte, err error) {
+ var url string
+ for _, a := range assets {
+ if strings.HasSuffix(a.Name, suffix) {
+ url = a.URL
+ filename = a.Name
+ break
+ }
+ }
+
+ if url == "" {
+ return "", nil, fmt.Errorf("unable to find file with suffix %v", suffix)
+ }
+
+ glog.V(0).Infof("download %v\n", filename)
+ data, err = getGithubData(ctx, url)
+ if err != nil {
+ return "", nil, err
+ }
+
+ return filename, data, nil
+}
+
+func extractToFile(buf []byte, filename, target string) error {
+ var rd io.Reader = bytes.NewReader(buf)
+
+ switch filepath.Ext(filename) {
+ case ".gz":
+ gr, err := gzip.NewReader(rd)
+ if err != nil {
+ return err
+ }
+ defer gr.Close()
+ trd := tar.NewReader(gr)
+ hdr, terr := trd.Next()
+ if terr != nil {
+ glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr)
+ return terr
+ }
+ rd = trd
+ case ".zip":
+ zrd, err := zip.NewReader(bytes.NewReader(buf), int64(len(buf)))
+ if err != nil {
+ return err
+ }
+
+ if len(zrd.File) != 1 {
+ return fmt.Errorf("ZIP archive contains more than one file")
+ }
+
+ file, err := zrd.File[0].Open()
+ if err != nil {
+ return err
+ }
+
+ defer func() {
+ _ = file.Close()
+ }()
+
+ rd = file
+ }
+
+ // Write everything to a temp file
+ dir := filepath.Dir(target)
+ new, err := ioutil.TempFile(dir, "weed")
+ if err != nil {
+ return err
+ }
+
+ n, err := io.Copy(new, rd)
+ if err != nil {
+ _ = new.Close()
+ _ = os.Remove(new.Name())
+ return err
+ }
+ if err = new.Sync(); err != nil {
+ return err
+ }
+ if err = new.Close(); err != nil {
+ return err
+ }
+
+ mode := os.FileMode(0755)
+ // attempt to find the original mode
+ if fi, err := os.Lstat(target); err == nil {
+ mode = fi.Mode()
+ }
+
+ // Rename the temp file to the final location atomically.
+ if err := os.Rename(new.Name(), target); err != nil {
+ return err
+ }
+
+ glog.V(0).Infof("saved %d bytes in %v\n", n, target)
+ return os.Chmod(target, mode)
+}
diff --git a/weed/command/update_full.go b/weed/command/update_full.go
new file mode 100644
index 000000000..185203aee
--- /dev/null
+++ b/weed/command/update_full.go
@@ -0,0 +1,9 @@
+//go:build elastic && ydb && gocdk && tikv
+// +build elastic,ydb,gocdk,tikv
+
+package command
+
+//set true if gtags are set
+func init() {
+ isFullVersion = true
+}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index b1455352c..158bdf162 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -65,7 +65,8 @@ type VolumeServerOptions struct {
preStopSeconds *int
metricsHttpPort *int
// pulseSeconds *int
- enableTcp *bool
+ enableTcp *bool
+ inflightUploadDataTimeout *time.Duration
}
func init() {
@@ -96,6 +97,7 @@ func init() {
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port")
+ v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
}
var cmdVolume = &Command{
@@ -244,6 +246,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024,
int64(*v.concurrentDownloadLimitMB)*1024*1024,
+ *v.inflightUploadDataTimeout,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 091bbee5a..4eb657dfa 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,7 +3,6 @@ package filer
import (
"bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
"net/url"
@@ -11,6 +10,8 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -63,14 +64,14 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
if err != nil {
- return chunks, nil, err
+ return dataChunks, nil, err
}
manifestChunks = append(manifestChunks, chunk)
// recursive
subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
- return chunks, nil, subErr
+ return dataChunks, nil, subErr
}
dataChunks = append(dataChunks, subDataChunks...)
manifestChunks = append(manifestChunks, subManifestChunks...)
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 208ef8095..48b344bf8 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -3,11 +3,12 @@ package filer
import (
"bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
- "golang.org/x/exp/slices"
"math"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "golang.org/x/exp/slices"
+
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -248,6 +249,9 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
+ if err != nil {
+ return
+ }
visibles2 := readResolvedChunks(chunks)
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
index 55c976915..891bf925b 100644
--- a/weed/filer/s3iam_conf.go
+++ b/weed/filer/s3iam_conf.go
@@ -2,13 +2,12 @@ package filer
import (
"bytes"
- "github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"io"
)
-func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfiguration) error {
+func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error {
if err := jsonpb.Unmarshal(bytes.NewBuffer(content), config); err != nil {
return err
}
diff --git a/weed/filer/tikv/tikv.go b/weed/filer/tikv/tikv.go
new file mode 100644
index 000000000..ba1da27a8
--- /dev/null
+++ b/weed/filer/tikv/tikv.go
@@ -0,0 +1,6 @@
+/*
+ * Package tikv is for TiKV filer store.
+ * This empty file is let go build can work without tikv tag
+ * Using "make full_install" to enable TiKV filer store.
+ */
+package tikv
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
new file mode 100644
index 000000000..f8932663d
--- /dev/null
+++ b/weed/filer/tikv/tikv_store.go
@@ -0,0 +1,396 @@
+//go:build tikv
+// +build tikv
+
+package tikv
+
+import (
+ "bytes"
+ "context"
+ "crypto/sha1"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+var (
+ _ filer.FilerStore = ((*TikvStore)(nil))
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &TikvStore{})
+}
+
+type TikvStore struct {
+ client *txnkv.Client
+ deleteRangeConcurrency int
+ onePC bool
+}
+
+// Basic APIs
+func (store *TikvStore) GetName() string {
+ return "tikv"
+}
+
+func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
+ pdAddrs := []string{}
+ pdAddrsStr := config.GetString(prefix + "pdaddrs")
+ for _, item := range strings.Split(pdAddrsStr, ",") {
+ pdAddrs = append(pdAddrs, strings.TrimSpace(item))
+ }
+ drc := config.GetInt(prefix + "deleterange_concurrency")
+ if drc <= 0 {
+ drc = 1
+ }
+ store.onePC = config.GetBool(prefix + "enable_1pc")
+ store.deleteRangeConcurrency = drc
+ return store.initialize(pdAddrs)
+}
+
+func (store *TikvStore) initialize(pdAddrs []string) error {
+ client, err := txnkv.NewClient(pdAddrs)
+ store.client = client
+ return err
+}
+
+func (store *TikvStore) Shutdown() {
+ err := store.client.Close()
+ if err != nil {
+ glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
+ }
+}
+
+// ~ Basic APIs
+
+// Entry APIs
+func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ dir, name := entry.DirAndName()
+ key := generateKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+ return nil
+}
+
+func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var value []byte = nil
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ value = val
+ }
+ return err
+ })
+
+ if isNotExists(err) || value == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", path, err)
+ }
+
+ entry := &filer.Entry{
+ FullPath: path,
+ }
+ err = entry.DecodeAttributesAndChunks(value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+ return entry, nil
+}
+
+func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ return nil
+}
+
+// ~ Entry APIs
+
+// Directory APIs
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
+ directoryPrefix := genDirectoryKeyPrefix(path, "")
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ var (
+ startKey []byte = nil
+ endKey []byte = nil
+ )
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(directoryPrefix, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ endKey = key
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ if startKey == nil {
+ startKey = key
+ }
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ // Only one Key matched just delete it.
+ if startKey != nil && bytes.Equal(startKey, endKey) {
+ return txn.Delete(startKey)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+
+ if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
+ // has startKey and endKey and they are not equals, so use delete range
+ _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ }
+ return err
+}
+
+func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ lastFileName := ""
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return lastFileName, err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(lastFileStart, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ i := int64(0)
+ first := true
+ for iter.Valid() {
+ if first {
+ first = false
+ if !includeStartFile {
+ if iter.Valid() {
+ // Check first item is lastFileStart
+ if bytes.Equal(iter.Key(), lastFileStart) {
+ // Is lastFileStart and not include start file, just
+ // ignore it.
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ continue
+ }
+ }
+ }
+ }
+ // Check for limitation
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+ // Validate key prefix
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ value := iter.Value()
+
+ // Start process
+ fileName := getNameFromKey(key)
+ if fileName != "" {
+ // Got file name, then generate the Entry
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), fileName),
+ }
+ // Update lastFileName
+ lastFileName = fileName
+ // Check for decode value.
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ // Got error just return the error
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return err
+ }
+ // Run for each callback if return false just break the iteration
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ // End process
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+ return lastFileName, nil
+}
+
+// ~ Directory APIs
+
+// Transaction Related APIs
+func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.client.Begin()
+ if err != nil {
+ return ctx, err
+ }
+ if store.onePC {
+ tx.SetEnable1PC(store.onePC)
+ }
+ return context.WithValue(ctx, "tx", tx), nil
+}
+
+func (store *TikvStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Commit(context.Background())
+ }
+ return nil
+}
+
+func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+// ~ Transaction Related APIs
+
+// Transaction Wrapper
+type TxnWrapper struct {
+ *txnkv.KVTxn
+ inContext bool
+}
+
+func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
+ err := f(w.KVTxn)
+ if !w.inContext {
+ if err != nil {
+ w.KVTxn.Rollback()
+ return err
+ }
+ w.KVTxn.Commit(context.Background())
+ return nil
+ }
+ return err
+}
+
+func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return &TxnWrapper{tx, true}, nil
+ }
+ txn, err := store.client.Begin()
+ if err != nil {
+ return nil, err
+ }
+ if store.onePC {
+ txn.SetEnable1PC(store.onePC)
+ }
+ return &TxnWrapper{txn, false}, nil
+}
+
+// ~ Transaction Wrapper
+
+// Encoding Functions
+func hashToBytes(dir string) []byte {
+ h := sha1.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return b
+}
+
+func generateKey(dirPath, fileName string) []byte {
+ key := hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func getNameFromKey(key []byte) string {
+ return string(key[sha1.Size:])
+}
+
+func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func isNotExists(err error) bool {
+ if err == nil {
+ return false
+ }
+ if err.Error() == "not exist" {
+ return true
+ }
+ return false
+}
+
+// ~ Encoding Functions
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
new file mode 100644
index 000000000..1d9428c69
--- /dev/null
+++ b/weed/filer/tikv/tikv_store_kv.go
@@ -0,0 +1,50 @@
+//go:build tikv
+// +build tikv
+
+package tikv
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+}
+
+func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var data []byte = nil
+ err = tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ data = val
+ }
+ return err
+ })
+ if isNotExists(err) {
+ return data, filer.ErrKvNotFound
+ }
+ return data, err
+}
+
+func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+}
diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go
index e465158e8..fa17a9261 100644
--- a/weed/mount/inode_to_path.go
+++ b/weed/mount/inode_to_path.go
@@ -157,6 +157,10 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (replacedIn
defer i.Unlock()
sourceInode, sourceFound := i.path2inode[sourcePath]
targetInode, targetFound := i.path2inode[targetPath]
+ if targetFound {
+ delete(i.inode2path, targetInode)
+ delete(i.path2inode, targetPath)
+ }
if sourceFound {
delete(i.path2inode, sourcePath)
i.path2inode[targetPath] = sourceInode
@@ -165,11 +169,14 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (replacedIn
// so no need to worry about their source inodes
return
}
- i.inode2path[sourceInode].FullPath = targetPath
- if targetFound {
- delete(i.inode2path, targetInode)
+ if entry, entryFound := i.inode2path[sourceInode]; entryFound {
+ entry.FullPath = targetPath
+ entry.isChildrenCached = false
+ if !targetFound {
+ entry.nlookup++
+ }
} else {
- i.inode2path[sourceInode].nlookup++
+ glog.Errorf("MovePath %s to %s: sourceInode %d not found", sourcePath, targetPath, sourceInode)
}
return targetInode
}
diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go
index 289a0bc2c..4246f0a4c 100644
--- a/weed/mount/weedfs_dir_mkrm.go
+++ b/weed/mount/weedfs_dir_mkrm.go
@@ -104,7 +104,7 @@ func (wfs *WFS) Rmdir(cancel <-chan struct{}, header *fuse.InHeader, name string
glog.V(3).Infof("remove directory: %v", entryFullPath)
ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
- err := filer_pb.Remove(wfs, string(dirFullPath), name, true, true, ignoreRecursiveErr, false, []int32{wfs.signature})
+ err := filer_pb.Remove(wfs, string(dirFullPath), name, true, false, ignoreRecursiveErr, false, []int32{wfs.signature})
if err != nil {
glog.V(0).Infof("remove %s: %v", entryFullPath, err)
if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
diff --git a/weed/pb/filer_pb/signature.go b/weed/pb/filer_pb/signature.go
deleted file mode 100644
index e13afc656..000000000
--- a/weed/pb/filer_pb/signature.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package filer_pb
-
-func (r *CreateEntryRequest) AddSignature(sig int32) {
- r.Signatures = append(r.Signatures, sig)
-}
-func (r *CreateEntryRequest) HasSigned(sig int32) bool {
- for _, s := range r.Signatures {
- if s == sig {
- return true
- }
- }
- return false
-}
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index d89e61433..990cf74f9 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -46,8 +46,9 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
var options []grpc.ServerOption
options = append(options,
grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: 10 * time.Second, // wait time before ping if no activity
- Timeout: 20 * time.Second, // ping timeout
+ Time: 10 * time.Second, // wait time before ping if no activity
+ Timeout: 20 * time.Second, // ping timeout
+ MaxConnectionAge: 10 * time.Hour,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 60 * time.Second, // min time a client should wait before sending a ping
diff --git a/weed/pb/remote.proto b/weed/pb/remote.proto
index bdecf4dbe..13f7a878b 100644
--- a/weed/pb/remote.proto
+++ b/weed/pb/remote.proto
@@ -49,11 +49,6 @@ message RemoteConf {
string wasabi_endpoint = 42;
string wasabi_region = 43;
- repeated string hdfs_namenodes = 50;
- string hdfs_username = 51;
- string hdfs_service_principal_name = 52;
- string hdfs_data_transfer_protection = 53;
-
string filebase_access_key = 60;
string filebase_secret_key = 61;
string filebase_endpoint = 62;
diff --git a/weed/pb/s3.proto b/weed/pb/s3.proto
index 4f129b817..45a877fac 100644
--- a/weed/pb/s3.proto
+++ b/weed/pb/s3.proto
@@ -23,3 +23,13 @@ message S3ConfigureRequest {
message S3ConfigureResponse {
}
+
+message S3CircuitBreakerConfig {
+ S3CircuitBreakerOptions global=1;
+ map<string, S3CircuitBreakerOptions> buckets= 2;
+}
+
+message S3CircuitBreakerOptions {
+ bool enabled=1;
+ map<string, int64> actions = 2;
+}
diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go
index 53f174f02..c1bd23556 100644
--- a/weed/pb/s3_pb/s3.pb.go
+++ b/weed/pb/s3_pb/s3.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.1
// source: s3.proto
package s3_pb
@@ -105,6 +105,116 @@ func (*S3ConfigureResponse) Descriptor() ([]byte, []int) {
return file_s3_proto_rawDescGZIP(), []int{1}
}
+type S3CircuitBreakerConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Global *S3CircuitBreakerOptions `protobuf:"bytes,1,opt,name=global,proto3" json:"global,omitempty"`
+ Buckets map[string]*S3CircuitBreakerOptions `protobuf:"bytes,2,rep,name=buckets,proto3" json:"buckets,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
+}
+
+func (x *S3CircuitBreakerConfig) Reset() {
+ *x = S3CircuitBreakerConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_s3_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *S3CircuitBreakerConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*S3CircuitBreakerConfig) ProtoMessage() {}
+
+func (x *S3CircuitBreakerConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_s3_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use S3CircuitBreakerConfig.ProtoReflect.Descriptor instead.
+func (*S3CircuitBreakerConfig) Descriptor() ([]byte, []int) {
+ return file_s3_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *S3CircuitBreakerConfig) GetGlobal() *S3CircuitBreakerOptions {
+ if x != nil {
+ return x.Global
+ }
+ return nil
+}
+
+func (x *S3CircuitBreakerConfig) GetBuckets() map[string]*S3CircuitBreakerOptions {
+ if x != nil {
+ return x.Buckets
+ }
+ return nil
+}
+
+type S3CircuitBreakerOptions struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Enabled bool `protobuf:"varint,1,opt,name=enabled,proto3" json:"enabled,omitempty"`
+ Actions map[string]int64 `protobuf:"bytes,2,rep,name=actions,proto3" json:"actions,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
+}
+
+func (x *S3CircuitBreakerOptions) Reset() {
+ *x = S3CircuitBreakerOptions{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_s3_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *S3CircuitBreakerOptions) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*S3CircuitBreakerOptions) ProtoMessage() {}
+
+func (x *S3CircuitBreakerOptions) ProtoReflect() protoreflect.Message {
+ mi := &file_s3_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use S3CircuitBreakerOptions.ProtoReflect.Descriptor instead.
+func (*S3CircuitBreakerOptions) Descriptor() ([]byte, []int) {
+ return file_s3_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *S3CircuitBreakerOptions) GetEnabled() bool {
+ if x != nil {
+ return x.Enabled
+ }
+ return false
+}
+
+func (x *S3CircuitBreakerOptions) GetActions() map[string]int64 {
+ if x != nil {
+ return x.Actions
+ }
+ return nil
+}
+
var File_s3_proto protoreflect.FileDescriptor
var file_s3_proto_rawDesc = []byte{
@@ -116,18 +226,47 @@ var file_s3_proto_rawDesc = []byte{
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x1a, 0x73, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75,
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x69, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e,
0x74, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x5f, 0x0a, 0x09, 0x53, 0x65, 0x61, 0x77,
- 0x65, 0x65, 0x64, 0x53, 0x33, 0x12, 0x52, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75,
- 0x72, 0x65, 0x12, 0x20, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x33, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52,
- 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x49, 0x0a, 0x10, 0x73, 0x65, 0x61,
- 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x07, 0x53,
- 0x33, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
- 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61,
- 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x73,
- 0x33, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x87, 0x02, 0x0a, 0x16, 0x53, 0x33, 0x43,
+ 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x42, 0x72, 0x65, 0x61, 0x6b, 0x65, 0x72, 0x43, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x12, 0x3d, 0x0a, 0x06, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
+ 0x70, 0x62, 0x2e, 0x53, 0x33, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x42, 0x72, 0x65, 0x61,
+ 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x06, 0x67, 0x6c, 0x6f, 0x62,
+ 0x61, 0x6c, 0x12, 0x4b, 0x0a, 0x07, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
+ 0x70, 0x62, 0x2e, 0x53, 0x33, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x42, 0x72, 0x65, 0x61,
+ 0x6b, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74,
+ 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x1a,
+ 0x61, 0x0a, 0x0c, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12,
+ 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65,
+ 0x79, 0x12, 0x3b, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
+ 0x53, 0x33, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74, 0x42, 0x72, 0x65, 0x61, 0x6b, 0x65, 0x72,
+ 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02,
+ 0x38, 0x01, 0x22, 0xbd, 0x01, 0x0a, 0x17, 0x53, 0x33, 0x43, 0x69, 0x72, 0x63, 0x75, 0x69, 0x74,
+ 0x42, 0x72, 0x65, 0x61, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x18,
+ 0x0a, 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52,
+ 0x07, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x64, 0x12, 0x4c, 0x0a, 0x07, 0x61, 0x63, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x33, 0x43, 0x69, 0x72, 0x63, 0x75,
+ 0x69, 0x74, 0x42, 0x72, 0x65, 0x61, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x61,
+ 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e,
+ 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02,
+ 0x38, 0x01, 0x32, 0x5f, 0x0a, 0x09, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x53, 0x33, 0x12,
+ 0x52, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x12, 0x20, 0x2e, 0x6d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x33, 0x43, 0x6f,
+ 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x33,
+ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x22, 0x00, 0x42, 0x49, 0x0a, 0x10, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73,
+ 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x07, 0x53, 0x33, 0x50, 0x72, 0x6f, 0x74, 0x6f,
+ 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72,
+ 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73,
+ 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x73, 0x33, 0x5f, 0x70, 0x62, 0x62, 0x06,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -142,19 +281,27 @@ func file_s3_proto_rawDescGZIP() []byte {
return file_s3_proto_rawDescData
}
-var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_s3_proto_goTypes = []interface{}{
- (*S3ConfigureRequest)(nil), // 0: messaging_pb.S3ConfigureRequest
- (*S3ConfigureResponse)(nil), // 1: messaging_pb.S3ConfigureResponse
+ (*S3ConfigureRequest)(nil), // 0: messaging_pb.S3ConfigureRequest
+ (*S3ConfigureResponse)(nil), // 1: messaging_pb.S3ConfigureResponse
+ (*S3CircuitBreakerConfig)(nil), // 2: messaging_pb.S3CircuitBreakerConfig
+ (*S3CircuitBreakerOptions)(nil), // 3: messaging_pb.S3CircuitBreakerOptions
+ nil, // 4: messaging_pb.S3CircuitBreakerConfig.BucketsEntry
+ nil, // 5: messaging_pb.S3CircuitBreakerOptions.ActionsEntry
}
var file_s3_proto_depIdxs = []int32{
- 0, // 0: messaging_pb.SeaweedS3.Configure:input_type -> messaging_pb.S3ConfigureRequest
- 1, // 1: messaging_pb.SeaweedS3.Configure:output_type -> messaging_pb.S3ConfigureResponse
- 1, // [1:2] is the sub-list for method output_type
- 0, // [0:1] is the sub-list for method input_type
- 0, // [0:0] is the sub-list for extension type_name
- 0, // [0:0] is the sub-list for extension extendee
- 0, // [0:0] is the sub-list for field type_name
+ 3, // 0: messaging_pb.S3CircuitBreakerConfig.global:type_name -> messaging_pb.S3CircuitBreakerOptions
+ 4, // 1: messaging_pb.S3CircuitBreakerConfig.buckets:type_name -> messaging_pb.S3CircuitBreakerConfig.BucketsEntry
+ 5, // 2: messaging_pb.S3CircuitBreakerOptions.actions:type_name -> messaging_pb.S3CircuitBreakerOptions.ActionsEntry
+ 3, // 3: messaging_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> messaging_pb.S3CircuitBreakerOptions
+ 0, // 4: messaging_pb.SeaweedS3.Configure:input_type -> messaging_pb.S3ConfigureRequest
+ 1, // 5: messaging_pb.SeaweedS3.Configure:output_type -> messaging_pb.S3ConfigureResponse
+ 5, // [5:6] is the sub-list for method output_type
+ 4, // [4:5] is the sub-list for method input_type
+ 4, // [4:4] is the sub-list for extension type_name
+ 4, // [4:4] is the sub-list for extension extendee
+ 0, // [0:4] is the sub-list for field type_name
}
func init() { file_s3_proto_init() }
@@ -187,6 +334,30 @@ func file_s3_proto_init() {
return nil
}
}
+ file_s3_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*S3CircuitBreakerConfig); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_s3_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*S3CircuitBreakerOptions); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -194,7 +365,7 @@ func file_s3_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_s3_proto_rawDesc,
NumEnums: 0,
- NumMessages: 2,
+ NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/weed/remote_storage/hdfs/doc.go b/weed/remote_storage/hdfs/doc.go
deleted file mode 100644
index 086c9de3f..000000000
--- a/weed/remote_storage/hdfs/doc.go
+++ /dev/null
@@ -1,9 +0,0 @@
-/*
-
-Package hdfs is for remote hdfs storage.
-
-The referenced "github.com/colinmarc/hdfs/v2" library is too big when compiled.
-So this is only compiled in "make full_install".
-
-*/
-package hdfs
diff --git a/weed/remote_storage/hdfs/hdfs_kerberos.go b/weed/remote_storage/hdfs/hdfs_kerberos.go
deleted file mode 100644
index ba152020a..000000000
--- a/weed/remote_storage/hdfs/hdfs_kerberos.go
+++ /dev/null
@@ -1,58 +0,0 @@
-//go:build hdfs
-// +build hdfs
-
-package hdfs
-
-import (
- "fmt"
- "os"
- "os/user"
- "strings"
-
- krb "github.com/jcmturner/gokrb5/v8/client"
- "github.com/jcmturner/gokrb5/v8/config"
- "github.com/jcmturner/gokrb5/v8/credentials"
-)
-
-// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go
-func getKerberosClient() (*krb.Client, error) {
- configPath := os.Getenv("KRB5_CONFIG")
- if configPath == "" {
- configPath = "/etc/krb5.conf"
- }
-
- cfg, err := config.Load(configPath)
- if err != nil {
- return nil, err
- }
-
- // Determine the ccache location from the environment, falling back to the
- // default location.
- ccachePath := os.Getenv("KRB5CCNAME")
- if strings.Contains(ccachePath, ":") {
- if strings.HasPrefix(ccachePath, "FILE:") {
- ccachePath = strings.SplitN(ccachePath, ":", 2)[1]
- } else {
- return nil, fmt.Errorf("unusable ccache: %s", ccachePath)
- }
- } else if ccachePath == "" {
- u, err := user.Current()
- if err != nil {
- return nil, err
- }
-
- ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
- }
-
- ccache, err := credentials.LoadCCache(ccachePath)
- if err != nil {
- return nil, err
- }
-
- client, err := krb.NewFromCCache(ccache, cfg)
- if err != nil {
- return nil, err
- }
-
- return client, nil
-}
diff --git a/weed/remote_storage/hdfs/hdfs_storage_client.go b/weed/remote_storage/hdfs/hdfs_storage_client.go
deleted file mode 100644
index 3b71958fd..000000000
--- a/weed/remote_storage/hdfs/hdfs_storage_client.go
+++ /dev/null
@@ -1,194 +0,0 @@
-//go:build hdfs
-// +build hdfs
-
-package hdfs
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
- "github.com/chrislusf/seaweedfs/weed/remote_storage"
- "github.com/chrislusf/seaweedfs/weed/util"
- hdfs "github.com/colinmarc/hdfs/v2"
- "io"
- "os"
- "path"
-)
-
-func init() {
- remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker)
-}
-
-type hdfsRemoteStorageMaker struct{}
-
-func (s hdfsRemoteStorageMaker) HasBucket() bool {
- return false
-}
-
-func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
- client := &hdfsRemoteStorageClient{
- conf: conf,
- }
-
- options := hdfs.ClientOptions{
- Addresses: conf.HdfsNamenodes,
- UseDatanodeHostname: false,
- }
-
- if conf.HdfsServicePrincipalName != "" {
- var err error
- options.KerberosClient, err = getKerberosClient()
- if err != nil {
- return nil, fmt.Errorf("get kerberos authentication: %s", err)
- }
- options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName
-
- if conf.HdfsDataTransferProtection != "" {
- options.DataTransferProtection = conf.HdfsDataTransferProtection
- }
- } else {
- options.User = conf.HdfsUsername
- }
-
- c, err := hdfs.NewClient(options)
- if err != nil {
- return nil, err
- }
-
- client.client = c
- return client, nil
-}
-
-type hdfsRemoteStorageClient struct {
- conf *remote_pb.RemoteConf
- client *hdfs.Client
-}
-
-var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
-
-func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
-
- return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
- children, err := c.client.ReadDir(string(parentDir))
- if err != nil {
- return err
- }
- for _, child := range children {
- if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{
- StorageName: c.conf.Name,
- LastLocalSyncTsNs: 0,
- RemoteETag: "",
- RemoteMtime: child.ModTime().Unix(),
- RemoteSize: child.Size(),
- }); err != nil {
- return nil
- }
- }
- return nil
- }, util.FullPath(loc.Path), visitFn)
-
-}
-func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
-
- f, err := c.client.Open(loc.Path)
- if err != nil {
- return
- }
- defer f.Close()
- data = make([]byte, size)
- _, err = f.ReadAt(data, offset)
-
- return
-
-}
-
-func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
- return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode))
-}
-
-func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
- return c.client.RemoveAll(loc.Path)
-}
-
-func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
-
- dirname := path.Dir(loc.Path)
-
- // ensure parent directory
- if err = c.client.MkdirAll(dirname, 0755); err != nil {
- return
- }
-
- // remove existing file
- info, err := c.client.Stat(loc.Path)
- if err == nil {
- err = c.client.Remove(loc.Path)
- if err != nil {
- return
- }
- }
-
- // create new file
- out, err := c.client.Create(loc.Path)
- if err != nil {
- return
- }
-
- cleanup := func() {
- if removeErr := c.client.Remove(loc.Path); removeErr != nil {
- glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr)
- }
- }
-
- if _, err = io.Copy(out, reader); err != nil {
- cleanup()
- return
- }
-
- if err = out.Close(); err != nil {
- cleanup()
- return
- }
-
- info, err = c.client.Stat(loc.Path)
- if err != nil {
- return
- }
-
- return &filer_pb.RemoteEntry{
- RemoteMtime: info.ModTime().Unix(),
- RemoteSize: info.Size(),
- RemoteETag: "",
- StorageName: c.conf.Name,
- }, nil
-
-}
-
-func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
- if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode {
- if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil {
- return err
- }
- }
- return nil
-}
-
-func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
- if err = c.client.Remove(loc.Path); err != nil {
- return fmt.Errorf("hdfs delete %s: %v", loc.Path, err)
- }
- return
-}
-
-func (c *hdfsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
- return
-}
-
-func (c *hdfsRemoteStorageClient) CreateBucket(name string) (err error) {
- return
-}
-
-func (c *hdfsRemoteStorageClient) DeleteBucket(name string) (err error) {
- return
-}
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go
index 91fd5d830..f2bd94f56 100644
--- a/weed/s3api/auth_credentials_subscribe.go
+++ b/weed/s3api/auth_credentials_subscribe.go
@@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -22,12 +23,11 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
if message.NewParentPath != "" {
dir = message.NewParentPath
}
- if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile {
- if err := s3a.iam.LoadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil {
- return err
- }
- glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile)
- }
+ fileName := message.NewEntry.Name
+ content := message.NewEntry.Content
+
+ _ = s3a.onIamConfigUpdate(dir, fileName, content)
+ _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
return nil
}
@@ -38,5 +38,26 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
glog.V(0).Infof("iam follow metadata changes: %v", err)
return true
})
+}
+//reload iam config
+func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
+ if dir == filer.IamConfigDirecotry && filename == filer.IamIdentityFile {
+ if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
+ return err
+ }
+ glog.V(0).Infof("updated %s/%s", dir, filename)
+ }
+ return nil
+}
+
+//reload circuit breaker config
+func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
+ if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
+ if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
+ return err
+ }
+ glog.V(0).Infof("updated %s/%s", dir, filename)
+ }
+ return nil
}
diff --git a/weed/s3api/s3_constants/s3_config.go b/weed/s3api/s3_constants/s3_config.go
new file mode 100644
index 000000000..0fa5b26f4
--- /dev/null
+++ b/weed/s3api/s3_constants/s3_config.go
@@ -0,0 +1,18 @@
+package s3_constants
+
+import (
+ "strings"
+)
+
+var (
+ CircuitBreakerConfigDir = "/etc/s3"
+ CircuitBreakerConfigFile = "circuit_breaker.json"
+ AllowedActions = []string{ACTION_READ, ACTION_WRITE, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN}
+ LimitTypeCount = "Count"
+ LimitTypeBytes = "MB"
+ Separator = ":"
+)
+
+func Concat(elements ...string) string {
+ return strings.Join(elements, Separator)
+}
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
new file mode 100644
index 000000000..68fb0a5d2
--- /dev/null
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -0,0 +1,183 @@
+package s3api
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/gorilla/mux"
+ "net/http"
+ "sync"
+ "sync/atomic"
+)
+
+type CircuitBreaker struct {
+ sync.RWMutex
+ Enabled bool
+ counters map[string]*int64
+ limitations map[string]int64
+}
+
+func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
+ cb := &CircuitBreaker{
+ counters: make(map[string]*int64),
+ limitations: make(map[string]int64),
+ }
+
+ err := pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile)
+ if err != nil {
+ return fmt.Errorf("read S3 circuit breaker config: %v", err)
+ }
+ return cb.LoadS3ApiConfigurationFromBytes(content)
+ })
+
+ if err != nil {
+ glog.Infof("s3 circuit breaker not configured: %v", err)
+ }
+
+ return cb
+}
+
+func (cb *CircuitBreaker) LoadS3ApiConfigurationFromBytes(content []byte) error {
+ cbCfg := &s3_pb.S3CircuitBreakerConfig{}
+ if err := filer.ParseS3ConfigurationFromBytes(content, cbCfg); err != nil {
+ glog.Warningf("unmarshal error: %v", err)
+ return fmt.Errorf("unmarshal error: %v", err)
+ }
+ if err := cb.loadCircuitBreakerConfig(cbCfg); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerConfig) error {
+
+ //global
+ globalEnabled := false
+ globalOptions := cfg.Global
+ limitations := make(map[string]int64)
+ if globalOptions != nil && globalOptions.Enabled && len(globalOptions.Actions) > 0 {
+ globalEnabled = globalOptions.Enabled
+ for action, limit := range globalOptions.Actions {
+ limitations[action] = limit
+ }
+ }
+ cb.Enabled = globalEnabled
+
+ //buckets
+ for bucket, cbOptions := range cfg.Buckets {
+ if cbOptions.Enabled {
+ for action, limit := range cbOptions.Actions {
+ limitations[s3_constants.Concat(bucket, action)] = limit
+ }
+ }
+ }
+
+ cb.limitations = limitations
+ return nil
+}
+
+func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if !cb.Enabled {
+ f(w, r)
+ return
+ }
+
+ vars := mux.Vars(r)
+ bucket := vars["bucket"]
+
+ rollback, errCode := cb.limit(r, bucket, action)
+ defer func() {
+ for _, rf := range rollback {
+ rf()
+ }
+ }()
+
+ if errCode == s3err.ErrNone {
+ f(w, r)
+ return
+ }
+ s3err.WriteErrorResponse(w, r, errCode)
+ }, Action(action)
+}
+
+func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) {
+
+ //bucket simultaneous request count
+ bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
+ if bucketCountRollBack != nil {
+ rollback = append(rollback, bucketCountRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+
+ //bucket simultaneous request content bytes
+ bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
+ if bucketContentLengthRollBack != nil {
+ rollback = append(rollback, bucketContentLengthRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+
+ //global simultaneous request count
+ globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
+ if globalCountRollBack != nil {
+ rollback = append(rollback, globalCountRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+
+ //global simultaneous request content bytes
+ globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
+ if globalContentLengthRollBack != nil {
+ rollback = append(rollback, globalContentLengthRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+ return
+}
+
+func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) {
+ e = s3err.ErrNone
+ if max, ok := cb.limitations[key]; ok {
+ cb.RLock()
+ counter, exists := cb.counters[key]
+ cb.RUnlock()
+
+ if !exists {
+ cb.Lock()
+ counter, exists = cb.counters[key]
+ if !exists {
+ var newCounter int64
+ counter = &newCounter
+ cb.counters[key] = counter
+ }
+ cb.Unlock()
+ }
+ current := atomic.LoadInt64(counter)
+ if current+inc > max {
+ e = errCode
+ return
+ } else {
+ current := atomic.AddInt64(counter, inc)
+ f = func() {
+ atomic.AddInt64(counter, -inc)
+ }
+ if current > max {
+ e = errCode
+ return
+ }
+ }
+ }
+ return
+}
diff --git a/weed/s3api/s3api_circuit_breaker_test.go b/weed/s3api/s3api_circuit_breaker_test.go
new file mode 100644
index 000000000..5848cf164
--- /dev/null
+++ b/weed/s3api/s3api_circuit_breaker_test.go
@@ -0,0 +1,107 @@
+package s3api
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "testing"
+)
+
+type TestLimitCase struct {
+ actionName string
+
+ limitType string
+ bucketLimitValue int64
+ globalLimitValue int64
+
+ routineCount int
+ successCount int64
+}
+
+var (
+ bucket = "/test"
+ action = s3_constants.ACTION_WRITE
+ fileSize int64 = 200
+
+ TestLimitCases = []*TestLimitCase{
+
+ //bucket-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 5, 6, 60, 5},
+ {action, s3_constants.LimitTypeCount, 0, 6, 6, 0},
+
+ //global-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 6, 5, 6, 5},
+ {action, s3_constants.LimitTypeCount, 6, 0, 6, 0},
+
+ //bucket-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0},
+
+ //global-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0},
+ }
+)
+
+func TestLimit(t *testing.T) {
+ for _, tc := range TestLimitCases {
+ circuitBreakerConfig := &s3_pb.S3CircuitBreakerConfig{
+ Global: &s3_pb.S3CircuitBreakerOptions{
+ Enabled: true,
+ Actions: map[string]int64{
+ s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue,
+ s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue,
+ },
+ },
+ Buckets: map[string]*s3_pb.S3CircuitBreakerOptions{
+ bucket: {
+ Enabled: true,
+ Actions: map[string]int64{
+ s3_constants.Concat(tc.actionName, tc.limitType): tc.bucketLimitValue,
+ },
+ },
+ },
+ }
+ circuitBreaker := &CircuitBreaker{
+ counters: make(map[string]*int64),
+ limitations: make(map[string]int64),
+ }
+ err := circuitBreaker.loadCircuitBreakerConfig(circuitBreakerConfig)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName)
+ if successCount != tc.successCount {
+ t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc)
+ }
+ }
+}
+
+func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 {
+ var successCounter int64
+ resultCh := make(chan []func(), routineCount)
+ var wg sync.WaitGroup
+ for i := 0; i < routineCount; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ rollbackFn, errCode := circuitBreaker.limit(r, bucket, action)
+ if errCode == s3err.ErrNone {
+ atomic.AddInt64(&successCounter, 1)
+ }
+ resultCh <- rollbackFn
+ }()
+ }
+ wg.Wait()
+ close(resultCh)
+ for fns := range resultCh {
+ for _, fn := range fns {
+ fn()
+ }
+ }
+ return successCounter
+}
diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go
index 9157748f6..6b67ef337 100644
--- a/weed/s3api/s3api_object_copy_handlers.go
+++ b/weed/s3api/s3api_object_copy_handlers.go
@@ -75,8 +75,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
return
}
- dstUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject), dstBucket)
+ dstUrl := fmt.Sprintf("http://%s%s/%s%s",
+ s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject))
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
@@ -164,8 +164,8 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
rangeHeader := r.Header.Get("x-amz-copy-source-range")
- dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket)
+ dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index e650c9156..d2ff87832 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -4,16 +4,17 @@ import (
"crypto/sha1"
"encoding/xml"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
"io"
"net/http"
"net/url"
"strconv"
"strings"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
)
@@ -119,7 +120,9 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
- writeSuccessResponseXML(w, r, response)
+ //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
+ s3err.WriteXMLResponse(w, r, http.StatusNoContent, response)
+ s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
}
@@ -244,8 +247,8 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
- uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, bucket)
+ uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
if partID == 1 && r.Header.Get("Content-Type") == "" {
dataReader = mimeDetect(r, dataReader)
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 657fa8171..cc5ca5231 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -3,13 +3,13 @@ package s3api
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
"net"
"net/http"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb"
. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
@@ -35,6 +35,7 @@ type S3ApiServer struct {
s3_pb.UnimplementedSeaweedS3Server
option *S3ApiServerOption
iam *IdentityAccessManagement
+ cb *CircuitBreaker
randomClientId int32
filerGuard *security.Guard
client *http.Client
@@ -55,6 +56,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
iam: NewIdentityAccessManagement(option),
randomClientId: util.RandomInt32(),
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
+ cb: NewCircuitBreaker(option),
}
if option.LocalFilerSocket == nil || *option.LocalFilerSocket == "" {
s3ApiServer.client = &http.Client{Transport: &http.Transport{
@@ -73,7 +75,7 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
s3ApiServer.registerRouter(router)
- go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano())
+ go s3ApiServer.subscribeMetaEvents("s3", filer.DirectoryEtcRoot, time.Now().UnixNano())
return s3ApiServer, nil
}
@@ -107,115 +109,115 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// objects with query
// CopyObjectPart
- bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
+ bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// PutObjectPart
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
- bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.CompleteMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
// NewMultipartUpload
- bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.NewMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploads", "")
+ bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
// AbortMultipartUpload
- bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.AbortMultipartUploadHandler, ACTION_WRITE), "DELETE")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
// ListObjectParts
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_READ), "GET")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
// ListMultipartUploads
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_READ), "GET")).Queries("uploads", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
// GetObjectTagging
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_READ), "GET")).Queries("tagging", "")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "")
// PutObjectTagging
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_TAGGING), "PUT")).Queries("tagging", "")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "")
// DeleteObjectTagging
- bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "")
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "")
// PutObjectACL
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectAclHandler, ACTION_WRITE), "PUT")).Queries("acl", "")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "")
// PutObjectRetention
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectRetentionHandler, ACTION_WRITE), "PUT")).Queries("retention", "")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "")
// PutObjectLegalHold
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectLegalHoldHandler, ACTION_WRITE), "PUT")).Queries("legal-hold", "")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "")
// PutObjectLockConfiguration
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE), "PUT")).Queries("object-lock", "")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "")
// GetObjectACL
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectAclHandler, ACTION_READ), "GET")).Queries("acl", "")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ)), "GET")).Queries("acl", "")
// objects with query
// raw objects
// HeadObject
- bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.HeadObjectHandler, ACTION_READ), "GET"))
+ bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadObjectHandler, ACTION_READ)), "GET"))
// GetObject, but directory listing is not supported
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectHandler, ACTION_READ), "GET"))
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectHandler, ACTION_READ)), "GET"))
// CopyObject
- bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
+ bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY"))
// PutObject
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectHandler, ACTION_WRITE), "PUT"))
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectHandler, ACTION_WRITE)), "PUT"))
// DeleteObject
- bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectHandler, ACTION_WRITE), "DELETE"))
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE"))
// raw objects
// buckets with query
// DeleteMultipleObjects
- bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE), "DELETE")).Queries("delete", "")
+ bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "")
// GetBucketACL
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketAclHandler, ACTION_READ), "GET")).Queries("acl", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ)), "GET")).Queries("acl", "")
// PutBucketACL
- bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketAclHandler, ACTION_WRITE), "PUT")).Queries("acl", "")
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "")
// GetBucketPolicy
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketPolicyHandler, ACTION_READ), "GET")).Queries("policy", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "")
// PutBucketPolicy
- bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketPolicyHandler, ACTION_WRITE), "PUT")).Queries("policy", "")
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "")
// DeleteBucketPolicy
- bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketPolicyHandler, ACTION_WRITE), "DELETE")).Queries("policy", "")
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "")
// GetBucketCors
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketCorsHandler, ACTION_READ), "GET")).Queries("cors", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "")
// PutBucketCors
- bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketCorsHandler, ACTION_WRITE), "PUT")).Queries("cors", "")
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "")
// DeleteBucketCors
- bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketCorsHandler, ACTION_WRITE), "DELETE")).Queries("cors", "")
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "")
// GetBucketLifecycleConfiguration
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ), "GET")).Queries("lifecycle", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "")
// PutBucketLifecycleConfiguration
- bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE), "PUT")).Queries("lifecycle", "")
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "")
// DeleteBucketLifecycleConfiguration
- bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE), "DELETE")).Queries("lifecycle", "")
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "")
// GetBucketLocation
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketLocationHandler, ACTION_READ), "GET")).Queries("location", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "")
// GetBucketRequestPayment
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketRequestPaymentHandler, ACTION_READ), "GET")).Queries("requestPayment", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "")
// ListObjectsV2
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_LIST), "LIST")).Queries("list-type", "2")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2")
// buckets with query
// raw buckets
// PostPolicy
- bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.PostPolicyBucketHandler, ACTION_WRITE), "POST"))
+ bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST"))
// HeadBucket
- bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.HeadBucketHandler, ACTION_READ), "GET"))
+ bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET"))
// PutBucket
bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT"))
// DeleteBucket
- bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketHandler, ACTION_WRITE), "DELETE"))
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE"))
// ListObjectsV1 (Legacy)
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_LIST), "LIST"))
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))
// raw buckets
diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go
index 2e93f49cb..57f269a2e 100644
--- a/weed/s3api/s3err/s3api_errors.go
+++ b/weed/s3api/s3err/s3api_errors.go
@@ -104,6 +104,9 @@ const (
ErrExistingObjectIsDirectory
ErrExistingObjectIsFile
+
+ ErrTooManyRequest
+ ErrRequestBytesExceed
)
// error code to APIError structure, these fields carry respective
@@ -401,6 +404,16 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Existing Object is a file.",
HTTPStatusCode: http.StatusConflict,
},
+ ErrTooManyRequest: {
+ Code: "ErrTooManyRequest",
+ Description: "Too many simultaneous request count",
+ HTTPStatusCode: http.StatusTooManyRequests,
+ },
+ ErrRequestBytesExceed: {
+ Code: "ErrRequestBytesExceed",
+ Description: "Simultaneous request bytes exceed limitations",
+ HTTPStatusCode: http.StatusTooManyRequests,
+ },
}
// GetAPIError provides API Error for input API error code.
diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go
index 973d8c0eb..003807a25 100644
--- a/weed/s3api/stats.go
+++ b/weed/s3api/stats.go
@@ -1,6 +1,7 @@
package s3api
import (
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
"strconv"
@@ -27,11 +28,12 @@ func (r *StatusRecorder) Flush() {
func track(f http.HandlerFunc, action string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
+ bucket, _ := s3_constants.GetBucketAndObject(r)
w.Header().Set("Server", "SeaweedFS S3")
recorder := NewStatusResponseWriter(w)
start := time.Now()
f(recorder, r)
- stats_collect.S3RequestHistogram.WithLabelValues(action).Observe(time.Since(start).Seconds())
- stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status)).Inc()
+ stats_collect.S3RequestHistogram.WithLabelValues(action, bucket).Observe(time.Since(start).Seconds())
+ stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status), bucket).Inc()
}
}
diff --git a/weed/security/tls.go b/weed/security/tls.go
index 79552c026..bfa9d43c7 100644
--- a/weed/security/tls.go
+++ b/weed/security/tls.go
@@ -1,24 +1,22 @@
package security
import (
- "context"
"crypto/tls"
"crypto/x509"
+ "fmt"
+ "google.golang.org/grpc/credentials/tls/certprovider/pemfile"
+ "google.golang.org/grpc/security/advancedtls"
"io/ioutil"
- "os"
"strings"
-
- grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/peer"
- "google.golang.org/grpc/status"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
)
+const credRefreshingInterval = time.Duration(5) * time.Hour
+
type Authenticator struct {
AllowedWildcardDomain string
AllowedCommonNames map[string]bool
@@ -29,28 +27,39 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption
return nil, nil
}
- // load cert/key, ca cert
- cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key"))
+ serverOptions := pemfile.Options{
+ CertFile: config.GetString(component + ".cert"),
+ KeyFile: config.GetString(component + ".key"),
+ RefreshDuration: credRefreshingInterval,
+ }
+
+ serverIdentityProvider, err := pemfile.NewProvider(serverOptions)
if err != nil {
- glog.V(1).Infof("load cert: %s / key: %s error: %v",
- config.GetString(component+".cert"),
- config.GetString(component+".key"),
- err)
+ glog.Warningf("pemfile.NewProvider(%v) %v failed: %v", serverOptions, component, err)
return nil, nil
}
- caCert, err := os.ReadFile(config.GetString("grpc.ca"))
+
+ serverRootOptions := pemfile.Options{
+ RootFile: config.GetString("grpc.ca"),
+ RefreshDuration: credRefreshingInterval,
+ }
+ serverRootProvider, err := pemfile.NewProvider(serverRootOptions)
if err != nil {
- glog.V(1).Infof("read ca cert file %s error: %v", config.GetString("grpc.ca"), err)
+ glog.Warningf("pemfile.NewProvider(%v) failed: %v", serverRootOptions, err)
return nil, nil
}
- caCertPool := x509.NewCertPool()
- caCertPool.AppendCertsFromPEM(caCert)
- ta := credentials.NewTLS(&tls.Config{
- Certificates: []tls.Certificate{cert},
- ClientCAs: caCertPool,
- ClientAuth: tls.RequireAndVerifyClientCert,
- })
+ // Start a server and create a client using advancedtls API with Provider.
+ options := &advancedtls.ServerOptions{
+ IdentityOptions: advancedtls.IdentityCertificateOptions{
+ IdentityProvider: serverIdentityProvider,
+ },
+ RootOptions: advancedtls.RootCertificateOptions{
+ RootProvider: serverRootProvider,
+ },
+ RequireClientCert: true,
+ VType: advancedtls.CertVerification,
+ }
allowedCommonNames := config.GetString(component + ".allowed_commonNames")
allowedWildcardDomain := config.GetString("grpc.allowed_wildcard_domain")
if allowedCommonNames != "" || allowedWildcardDomain != "" {
@@ -62,7 +71,16 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption
AllowedCommonNames: allowedCommonNamesMap,
AllowedWildcardDomain: allowedWildcardDomain,
}
- return grpc.Creds(ta), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(auther.Authenticate))
+ options.VerifyPeer = auther.Authenticate
+ } else {
+ options.VerifyPeer = func(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) {
+ return &advancedtls.VerificationResults{}, nil
+ }
+ }
+ ta, err := advancedtls.NewServerCreds(options)
+ if err != nil {
+ glog.Warningf("advancedtls.NewServerCreds(%v) failed: %v", options, err)
+ return nil, nil
}
return grpc.Creds(ta), nil
}
@@ -77,25 +95,42 @@ func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
return grpc.WithInsecure()
}
- // load cert/key, cacert
- cert, err := tls.LoadX509KeyPair(certFileName, keyFileName)
+ clientOptions := pemfile.Options{
+ CertFile: certFileName,
+ KeyFile: keyFileName,
+ RefreshDuration: credRefreshingInterval,
+ }
+ clientProvider, err := pemfile.NewProvider(clientOptions)
if err != nil {
- glog.V(1).Infof("load cert/key error: %v", err)
+ glog.Warningf("pemfile.NewProvider(%v) failed %v", clientOptions, err)
return grpc.WithInsecure()
}
- caCert, err := os.ReadFile(caFileName)
+ clientRootOptions := pemfile.Options{
+ RootFile: config.GetString("grpc.ca"),
+ RefreshDuration: credRefreshingInterval,
+ }
+ clientRootProvider, err := pemfile.NewProvider(clientRootOptions)
if err != nil {
- glog.V(1).Infof("read ca cert file error: %v", err)
+ glog.Warningf("pemfile.NewProvider(%v) failed: %v", clientRootOptions, err)
+ return grpc.WithInsecure()
+ }
+ options := &advancedtls.ClientOptions{
+ IdentityOptions: advancedtls.IdentityCertificateOptions{
+ IdentityProvider: clientProvider,
+ },
+ VerifyPeer: func(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) {
+ return &advancedtls.VerificationResults{}, nil
+ },
+ RootOptions: advancedtls.RootCertificateOptions{
+ RootProvider: clientRootProvider,
+ },
+ VType: advancedtls.CertVerification,
+ }
+ ta, err := advancedtls.NewClientCreds(options)
+ if err != nil {
+ glog.Warningf("advancedtls.NewClientCreds(%v) failed: %v", options, err)
return grpc.WithInsecure()
}
- caCertPool := x509.NewCertPool()
- caCertPool.AppendCertsFromPEM(caCert)
-
- ta := credentials.NewTLS(&tls.Config{
- Certificates: []tls.Certificate{cert},
- RootCAs: caCertPool,
- InsecureSkipVerify: true,
- })
return grpc.WithTransportCredentials(ta)
}
@@ -116,27 +151,14 @@ func LoadClientTLSHTTP(clientCertFile string) *tls.Config {
}
}
-func (a Authenticator) Authenticate(ctx context.Context) (newCtx context.Context, err error) {
- p, ok := peer.FromContext(ctx)
- if !ok {
- return ctx, status.Error(codes.Unauthenticated, "no peer found")
+func (a Authenticator) Authenticate(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) {
+ if a.AllowedWildcardDomain != "" && strings.HasSuffix(params.Leaf.Subject.CommonName, a.AllowedWildcardDomain) {
+ return &advancedtls.VerificationResults{}, nil
}
-
- tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
- if !ok {
- return ctx, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
- }
- if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
- return ctx, status.Error(codes.Unauthenticated, "could not verify peer certificate")
- }
-
- commonName := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
- if a.AllowedWildcardDomain != "" && strings.HasSuffix(commonName, a.AllowedWildcardDomain) {
- return ctx, nil
- }
- if _, ok := a.AllowedCommonNames[commonName]; ok {
- return ctx, nil
+ if _, ok := a.AllowedCommonNames[params.Leaf.Subject.CommonName]; ok {
+ return &advancedtls.VerificationResults{}, nil
}
-
- return ctx, status.Errorf(codes.Unauthenticated, "invalid subject common name: %s", commonName)
+ err := fmt.Errorf("Authenticate: invalid subject client common name: %s", params.Leaf.Subject.CommonName)
+ glog.Error(err)
+ return nil, err
}
diff --git a/weed/sequence/snowflake_sequencer_test.go b/weed/sequence/snowflake_sequencer_test.go
new file mode 100644
index 000000000..731e330c5
--- /dev/null
+++ b/weed/sequence/snowflake_sequencer_test.go
@@ -0,0 +1,25 @@
+package sequence
+
+import (
+ "encoding/hex"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestSequencer(t *testing.T) {
+ seq, err := NewSnowflakeSequencer("for_test", 1)
+ assert.Equal(t, nil, err)
+ last := uint64(0)
+ bytes := make([]byte, types.NeedleIdSize)
+ for i := 0; i < 100; i++ {
+ next := seq.NextFileId(1)
+ types.NeedleIdToBytes(bytes, types.NeedleId(next))
+ println(hex.EncodeToString(bytes))
+ if last == next {
+ t.Errorf("last %d next %d", last, next)
+ }
+ last = next
+ }
+
+}
diff --git a/weed/server/common.go b/weed/server/common.go
index 39a8637ac..f02ec67ac 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -284,6 +284,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
if err := writeFn(bufferedWriter, 0, totalSize); err != nil {
+ glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -294,6 +295,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
//mostly copy from src/pkg/net/http/fs.go
ranges, err := parseRange(rangeReq, totalSize)
if err != nil {
+ glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
@@ -326,6 +328,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.WriteHeader(http.StatusPartialContent)
err = writeFn(bufferedWriter, ra.start, ra.length)
if err != nil {
+ glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -365,6 +368,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
}
w.WriteHeader(http.StatusPartialContent)
if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil {
+ glog.Errorf("processRangeRequest err: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 4d0fbbc41..4f5455cb1 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -263,8 +263,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
ms.deleteClient(clientName)
}()
-
- for _, message := range ms.Topo.ToVolumeLocations() {
+ for i, message := range ms.Topo.ToVolumeLocations() {
+ if i == 0 {
+ if leader, err := ms.Topo.Leader(); err == nil {
+ message.Leader = string(leader)
+ }
+ }
if sendErr := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); sendErr != nil {
return sendErr
}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index bc92dd332..9da947869 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -52,8 +52,13 @@ func (ms *MasterServer) ProcessGrowRequest() {
go func() {
glog.V(1).Infoln("starting automatic volume grow")
start := time.Now()
- _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
+ newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
+ if err == nil {
+ for _, newVidLocation := range newVidLocations {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
+ }
+ }
vl.DoneGrowRequest()
if req.ErrCh != nil {
@@ -204,8 +209,9 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
+ totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
- TotalSize: stats.TotalSize,
+ TotalSize: uint64(totalSize),
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index ade750ccc..47abfb892 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand"
"net/http"
"strconv"
@@ -81,7 +82,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
- count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
+ var newVidLocations []*master_pb.VolumeLocation
+ newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
+ count = len(newVidLocations)
}
} else {
err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index cc6578bf5..9971eaa48 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -121,7 +121,10 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
if option.RaftBootstrap {
os.RemoveAll(path.Join(s.dataDir, ldbFile))
os.RemoveAll(path.Join(s.dataDir, sdbFile))
- os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ os.RemoveAll(path.Join(s.dataDir, "snapshots"))
+ }
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshots"), os.ModePerm); err != nil {
+ return nil, err
}
baseDir := s.dataDir
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 8c372f0cc..ad0a1c8ce 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -125,7 +125,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil {
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil {
return nil, err
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index e3ec5b724..b4bc850e2 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,6 +3,8 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"io"
"math"
"os"
@@ -78,6 +80,28 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
}()
+ var preallocateSize int64
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
+ }
+ if resp.VolumePreallocate {
+ preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20)
+ }
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
+ }
+
+ if preallocateSize > 0 {
+ volumeFile := dataBaseFileName + ".dat"
+ _, err := backend.CreateVolumeFile(volumeFile, preallocateSize, 0)
+ if err != nil {
+ return fmt.Errorf("create volume file %s: %v", volumeFile, err)
+ }
+ }
+
// println("source:", volFileInfoResp.String())
copyResponse := &volume_server_pb.VolumeCopyResponse{}
reportInterval := int64(1024 * 1024 * 128)
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 477a3709c..abb30229a 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"net/http"
"sync"
+ "time"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -24,7 +25,9 @@ type VolumeServer struct {
inFlightDownloadDataSize int64
concurrentUploadLimit int64
concurrentDownloadLimit int64
+ inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
+ inflightUploadDataTimeout time.Duration
SeedMasterNodes []pb.ServerAddress
currentMaster pb.ServerAddress
@@ -60,6 +63,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitMB int,
concurrentUploadLimit int64,
concurrentDownloadLimit int64,
+ inflightUploadDataTimeout time.Duration,
) *VolumeServer {
v := util.GetViper()
@@ -84,9 +88,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
+ inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
concurrentUploadLimit: concurrentUploadLimit,
concurrentDownloadLimit: concurrentDownloadLimit,
+ inflightUploadDataTimeout: inflightUploadDataTimeout,
}
vs.SeedMasterNodes = masterNodes
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 49bc297fb..293f36f14 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"sync/atomic"
+ "time"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -56,20 +57,31 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
- // wait until in flight data is less than the limit
contentLength := getContentLength(r)
-
// exclude the replication from the concurrentUploadLimitMB
- if vs.concurrentUploadLimit != 0 && r.URL.Query().Get("type") != "replicate" &&
- atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
- err := fmt.Errorf("reject because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
- glog.V(1).Infof("too many requests: %v", err)
- writeJsonError(w, r, http.StatusTooManyRequests, err)
- return
+ if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 {
+ startTime := time.Now()
+ vs.inFlightUploadDataLimitCond.L.Lock()
+ for vs.inFlightUploadDataSize > vs.concurrentUploadLimit {
+ //wait timeout check
+ if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) {
+ vs.inFlightUploadDataLimitCond.L.Unlock()
+ err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ glog.V(1).Infof("too many requests: %v", err)
+ writeJsonError(w, r, http.StatusTooManyRequests, err)
+ return
+ }
+ glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ vs.inFlightUploadDataLimitCond.Wait()
+ }
+ vs.inFlightUploadDataLimitCond.L.Unlock()
}
atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
defer func() {
atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
+ if vs.concurrentUploadLimit != 0 {
+ vs.inFlightUploadDataLimitCond.Signal()
+ }
}()
// processs uploads
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 265dea03a..5140af2b4 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -48,6 +48,13 @@ type WebDavServer struct {
Handler *webdav.Handler
}
+func max(x, y int64) int64 {
+ if x <= y {
+ return y
+ }
+ return x
+}
+
func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
fs, _ := NewWebDavFileSystem(option)
@@ -496,6 +503,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
written, err := f.bufWriter.Write(buf)
if err == nil {
+ f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
f.off += int64(written)
}
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index c892c3443..e15090190 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -108,16 +108,6 @@ func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, write
remoteConfigureCommand.StringVar(&conf.StorjSecretKey, "storj.secret_key", "", "Storj secret key")
remoteConfigureCommand.StringVar(&conf.StorjEndpoint, "storj.endpoint", "", "Storj endpoint")
- var namenodes arrayFlags
- remoteConfigureCommand.Var(&namenodes, "hdfs.namenodes", "hdfs name node and port, example: namenode1:8020,namenode2:8020")
- remoteConfigureCommand.StringVar(&conf.HdfsUsername, "hdfs.username", "", "hdfs user name")
- remoteConfigureCommand.StringVar(&conf.HdfsServicePrincipalName, "hdfs.servicePrincipalName", "", `Kerberos service principal name for the namenode
-
-Example: hdfs/namenode.hadoop.docker
-Namenode running as service 'hdfs' with FQDN 'namenode.hadoop.docker'.
-`)
- remoteConfigureCommand.StringVar(&conf.HdfsDataTransferProtection, "hdfs.dataTransferProtection", "", "[authentication|integrity|privacy] Kerberos data transfer protection")
-
if err = remoteConfigureCommand.Parse(args); err != nil {
return nil
}
@@ -223,14 +213,3 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write
return nil
}
-
-type arrayFlags []string
-
-func (i *arrayFlags) String() string {
- return "my string representation"
-}
-
-func (i *arrayFlags) Set(value string) error {
- *i = append(*i, value)
- return nil
-}
diff --git a/weed/shell/command_s3_circuitbreaker.go b/weed/shell/command_s3_circuitbreaker.go
new file mode 100644
index 000000000..7e11153bf
--- /dev/null
+++ b/weed/shell/command_s3_circuitbreaker.go
@@ -0,0 +1,358 @@
+package shell
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "io"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+var LoadConfig = loadConfig
+
+func init() {
+ Commands = append(Commands, &commandS3CircuitBreaker{})
+}
+
+type commandS3CircuitBreaker struct {
+}
+
+func (c *commandS3CircuitBreaker) Name() string {
+ return "s3.circuitBreaker"
+}
+
+func (c *commandS3CircuitBreaker) Help() string {
+ return `configure and apply s3 circuit breaker options for each bucket
+
+ # examples
+ # add circuit breaker config for global
+ s3.circuitBreaker -global -type count -actions Read,Write -values 500,200 -apply
+
+ # disable global config
+ s3.circuitBreaker -global -disable -apply
+
+ # add circuit breaker config for buckets x,y,z
+ s3.circuitBreaker -buckets x,y,z -type count -actions Read,Write -values 200,100 -apply
+
+ # disable circuit breaker config of x
+ s3.circuitBreaker -buckets x -disable -apply
+
+ # delete circuit breaker config of x
+ s3.circuitBreaker -buckets x -delete -apply
+
+ # clear all circuit breaker config
+ s3.circuitBreaker -delete -apply
+ `
+}
+
+func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ dir := s3_constants.CircuitBreakerConfigDir
+ file := s3_constants.CircuitBreakerConfigFile
+
+ s3CircuitBreakerCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ buckets := s3CircuitBreakerCommand.String("buckets", "", "the bucket name(s) to configure, eg: -buckets x,y,z")
+ global := s3CircuitBreakerCommand.Bool("global", false, "configure global circuit breaker")
+
+ actions := s3CircuitBreakerCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin")
+ limitType := s3CircuitBreakerCommand.String("type", "", "'Count' or 'MB'; Count represents the number of simultaneous requests, and MB represents the content size of all simultaneous requests")
+ values := s3CircuitBreakerCommand.String("values", "", "comma separated values")
+
+ disabled := s3CircuitBreakerCommand.Bool("disable", false, "disable global or buckets circuit breaker")
+ deleted := s3CircuitBreakerCommand.Bool("delete", false, "delete circuit breaker config")
+
+ apply := s3CircuitBreakerCommand.Bool("apply", false, "update and apply current configuration")
+
+ if err = s3CircuitBreakerCommand.Parse(args); err != nil {
+ return nil
+
+ }
+
+ var buf bytes.Buffer
+ err = LoadConfig(commandEnv, dir, file, &buf)
+ if err != nil {
+ return err
+ }
+
+ cbCfg := &s3_pb.S3CircuitBreakerConfig{
+ Buckets: make(map[string]*s3_pb.S3CircuitBreakerOptions),
+ }
+ if buf.Len() > 0 {
+ if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), cbCfg); err != nil {
+ return err
+ }
+ }
+
+ if *deleted {
+ cmdBuckets, cmdActions, _, err := c.initActionsAndValues(buckets, actions, limitType, values, true)
+ if err != nil {
+ return err
+ }
+
+ if len(cmdBuckets) <= 0 && !*global {
+ if len(cmdActions) > 0 {
+ deleteGlobalActions(cbCfg, cmdActions, limitType)
+ if cbCfg.Buckets != nil {
+ var allBuckets []string
+ for bucket := range cbCfg.Buckets {
+ allBuckets = append(allBuckets, bucket)
+ }
+ deleteBucketsActions(allBuckets, cbCfg, cmdActions, limitType)
+ }
+ } else {
+ cbCfg.Global = nil
+ cbCfg.Buckets = nil
+ }
+ } else {
+ if len(cmdBuckets) > 0 {
+ deleteBucketsActions(cmdBuckets, cbCfg, cmdActions, limitType)
+ }
+ if *global {
+ deleteGlobalActions(cbCfg, cmdActions, nil)
+ }
+ }
+ } else {
+ cmdBuckets, cmdActions, cmdValues, err := c.initActionsAndValues(buckets, actions, limitType, values, *disabled)
+ if err != nil {
+ return err
+ }
+
+ if len(cmdActions) > 0 && len(*buckets) <= 0 && !*global {
+ return fmt.Errorf("one of -global and -buckets must be specified")
+ }
+
+ if len(*buckets) > 0 {
+ for _, bucket := range cmdBuckets {
+ var cbOptions *s3_pb.S3CircuitBreakerOptions
+ var exists bool
+ if cbOptions, exists = cbCfg.Buckets[bucket]; !exists {
+ cbOptions = &s3_pb.S3CircuitBreakerOptions{}
+ cbCfg.Buckets[bucket] = cbOptions
+ }
+ cbOptions.Enabled = !*disabled
+
+ if len(cmdActions) > 0 {
+ err = insertOrUpdateValues(cbOptions, cmdActions, cmdValues, limitType)
+ if err != nil {
+ return err
+ }
+ }
+
+ if len(cbOptions.Actions) <= 0 && !cbOptions.Enabled {
+ delete(cbCfg.Buckets, bucket)
+ }
+ }
+ }
+
+ if *global {
+ globalOptions := cbCfg.Global
+ if globalOptions == nil {
+ globalOptions = &s3_pb.S3CircuitBreakerOptions{Actions: make(map[string]int64, len(cmdActions))}
+ cbCfg.Global = globalOptions
+ }
+ globalOptions.Enabled = !*disabled
+
+ if len(cmdActions) > 0 {
+ err = insertOrUpdateValues(globalOptions, cmdActions, cmdValues, limitType)
+ if err != nil {
+ return err
+ }
+ }
+
+ if len(globalOptions.Actions) <= 0 && !globalOptions.Enabled {
+ cbCfg.Global = nil
+ }
+ }
+ }
+
+ buf.Reset()
+ err = filer.ProtoToText(&buf, cbCfg)
+ if err != nil {
+ return err
+ }
+
+ _, _ = fmt.Fprintf(writer, string(buf.Bytes()))
+ _, _ = fmt.Fprintln(writer)
+
+ if *apply {
+ if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer.SaveInsideFiler(client, dir, file, buf.Bytes())
+ }); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func loadConfig(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error {
+ if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer.ReadEntry(commandEnv.MasterClient, client, dir, file, buf)
+ }); err != nil && err != filer_pb.ErrNotFound {
+ return err
+ }
+ return nil
+}
+
+func insertOrUpdateValues(cbOptions *s3_pb.S3CircuitBreakerOptions, cmdActions []string, cmdValues []int64, limitType *string) error {
+ if len(*limitType) == 0 {
+ return fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
+ }
+
+ if cbOptions.Actions == nil {
+ cbOptions.Actions = make(map[string]int64, len(cmdActions))
+ }
+
+ if len(cmdValues) > 0 {
+ for i, action := range cmdActions {
+ cbOptions.Actions[s3_constants.Concat(action, *limitType)] = cmdValues[i]
+ }
+ }
+ return nil
+}
+
+func deleteBucketsActions(cmdBuckets []string, cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
+ if cbCfg.Buckets == nil {
+ return
+ }
+
+ if len(cmdActions) == 0 {
+ for _, bucket := range cmdBuckets {
+ delete(cbCfg.Buckets, bucket)
+ }
+ } else {
+ for _, bucket := range cmdBuckets {
+ if cbOption, ok := cbCfg.Buckets[bucket]; ok {
+ if len(cmdActions) > 0 && cbOption.Actions != nil {
+ for _, action := range cmdActions {
+ delete(cbOption.Actions, s3_constants.Concat(action, *limitType))
+ }
+ }
+
+ if len(cbOption.Actions) == 0 && !cbOption.Enabled {
+ delete(cbCfg.Buckets, bucket)
+ }
+ }
+ }
+ }
+
+ if len(cbCfg.Buckets) == 0 {
+ cbCfg.Buckets = nil
+ }
+}
+
+func deleteGlobalActions(cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
+ globalOptions := cbCfg.Global
+ if globalOptions == nil {
+ return
+ }
+
+ if len(cmdActions) == 0 && globalOptions.Actions != nil {
+ globalOptions.Actions = nil
+ return
+ } else {
+ for _, action := range cmdActions {
+ delete(globalOptions.Actions, s3_constants.Concat(action, *limitType))
+ }
+ }
+
+ if len(globalOptions.Actions) == 0 && !globalOptions.Enabled {
+ cbCfg.Global = nil
+ }
+}
+
+func (c *commandS3CircuitBreaker) initActionsAndValues(buckets, actions, limitType, values *string, parseValues bool) (cmdBuckets, cmdActions []string, cmdValues []int64, err error) {
+ if len(*buckets) > 0 {
+ cmdBuckets = strings.Split(*buckets, ",")
+ }
+
+ if len(*actions) > 0 {
+ cmdActions = strings.Split(*actions, ",")
+
+ //check action valid
+ for _, action := range cmdActions {
+ var found bool
+ for _, allowedAction := range s3_constants.AllowedActions {
+ if allowedAction == action {
+ found = true
+ }
+ }
+ if !found {
+ return nil, nil, nil, fmt.Errorf("value(%s) of flag[-action] not valid, allowed actions: %v", *actions, s3_constants.AllowedActions)
+ }
+ }
+ }
+
+ if !parseValues {
+ if len(cmdActions) < 0 {
+ for _, action := range s3_constants.AllowedActions {
+ cmdActions = append(cmdActions, action)
+ }
+ }
+
+ if len(*limitType) > 0 {
+ switch *limitType {
+ case s3_constants.LimitTypeCount:
+ elements := strings.Split(*values, ",")
+ if len(cmdActions) != len(elements) {
+ if len(elements) != 1 || len(elements) == 0 {
+ return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal")
+ }
+ v, err := strconv.Atoi(elements[0])
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
+ }
+ for range cmdActions {
+ cmdValues = append(cmdValues, int64(v))
+ }
+ } else {
+ for _, value := range elements {
+ v, err := strconv.Atoi(value)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
+ }
+ cmdValues = append(cmdValues, int64(v))
+ }
+ }
+ case s3_constants.LimitTypeBytes:
+ elements := strings.Split(*values, ",")
+ if len(cmdActions) != len(elements) {
+ if len(elements) != 1 || len(elements) == 0 {
+ return nil, nil, nil, fmt.Errorf("values count of -actions and -values not equal")
+ }
+ v, err := parseMBToBytes(elements[0])
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
+ }
+ for range cmdActions {
+ cmdValues = append(cmdValues, v)
+ }
+ } else {
+ for _, value := range elements {
+ v, err := parseMBToBytes(value)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
+ }
+ cmdValues = append(cmdValues, v)
+ }
+ }
+ default:
+ return nil, nil, nil, fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
+ }
+ } else {
+ *limitType = ""
+ }
+ }
+ return cmdBuckets, cmdActions, cmdValues, nil
+}
+
+func parseMBToBytes(valStr string) (int64, error) {
+ v, err := strconv.Atoi(valStr)
+ v *= 1024 * 1024
+ return int64(v), err
+}
diff --git a/weed/shell/command_s3_circuitbreaker_test.go b/weed/shell/command_s3_circuitbreaker_test.go
new file mode 100644
index 000000000..3d0b4ac6e
--- /dev/null
+++ b/weed/shell/command_s3_circuitbreaker_test.go
@@ -0,0 +1,292 @@
+package shell
+
+import (
+ "bytes"
+ "encoding/json"
+ "reflect"
+ "strings"
+ "testing"
+)
+
+type Case struct {
+ args []string
+ result string
+}
+
+var (
+ TestCases = []*Case{
+ //add circuit breaker config for global
+ {
+ args: strings.Split("-global -type Count -actions Read,Write -values 500,200", " "),
+ result: `{
+ "global": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ }
+ }`,
+ },
+
+ //disable global config
+ {
+ args: strings.Split("-global -disable", " "),
+ result: `{
+ "global": {
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ }
+ }`,
+ },
+
+ //add circuit breaker config for buckets x,y,z
+ {
+ args: strings.Split("-buckets x,y,z -type Count -actions Read,Write -values 200,100", " "),
+ result: `{
+ "global": {
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ },
+ "buckets": {
+ "x": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "y": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "z": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ }
+ }
+ }`,
+ },
+
+ //disable circuit breaker config of x
+ {
+ args: strings.Split("-buckets x -disable", " "),
+ result: `{
+ "global": {
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ },
+ "buckets": {
+ "x": {
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "y": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "z": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ }
+ }
+ }`,
+ },
+
+ //delete circuit breaker config of x
+ {
+ args: strings.Split("-buckets x -delete", " "),
+ result: `{
+ "global": {
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ },
+ "buckets": {
+ "y": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "z": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ }
+ }
+ }`,
+ },
+
+ //configure the circuit breaker for the size of the uploaded file for bucket x,y
+ {
+ args: strings.Split("-buckets x,y -type MB -actions Write -values 1024", " "),
+ result: `{
+ "global": {
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ },
+ "buckets": {
+ "x": {
+ "enabled": true,
+ "actions": {
+ "Write:MB": "1073741824"
+ }
+ },
+ "y": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100",
+ "Write:MB": "1073741824"
+ }
+ },
+ "z": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ }
+ }
+ }`,
+ },
+
+ //delete the circuit breaker configuration for the size of the uploaded file of bucket x,y
+ {
+ args: strings.Split("-buckets x,y -type MB -actions Write -delete", " "),
+ result: `{
+ "global": {
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ },
+ "buckets": {
+ "x": {
+ "enabled": true
+ },
+ "y": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "z": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ }
+ }
+ }`,
+ },
+
+ //enable global circuit breaker config (without -disable flag)
+ {
+ args: strings.Split("-global", " "),
+ result: `{
+ "global": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "500",
+ "Write:Count": "200"
+ }
+ },
+ "buckets": {
+ "x": {
+ "enabled": true
+ },
+ "y": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ },
+ "z": {
+ "enabled": true,
+ "actions": {
+ "Read:Count": "200",
+ "Write:Count": "100"
+ }
+ }
+ }
+ }`,
+ },
+
+ //clear all circuit breaker config
+ {
+ args: strings.Split("-delete", " "),
+ result: `{
+
+ }`,
+ },
+ }
+)
+
+func TestCircuitBreakerShell(t *testing.T) {
+ var writeBuf bytes.Buffer
+ cmd := &commandS3CircuitBreaker{}
+ LoadConfig = func(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error {
+ _, err := buf.Write(writeBuf.Bytes())
+ if err != nil {
+ return err
+ }
+ writeBuf.Reset()
+ return nil
+ }
+
+ for i, tc := range TestCases {
+ err := cmd.Do(tc.args, nil, &writeBuf)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if i != 0 {
+ result := writeBuf.String()
+
+ actual := make(map[string]interface{})
+ err := json.Unmarshal([]byte(result), &actual)
+ if err != nil {
+ t.Error(err)
+ }
+
+ expect := make(map[string]interface{})
+ err = json.Unmarshal([]byte(result), &expect)
+ if err != nil {
+ t.Error(err)
+ }
+ if !reflect.DeepEqual(actual, expect) {
+ t.Fatal("result of s3 circuit breaker shell command is unexpect!")
+ }
+ }
+ }
+}
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index 207b37c81..f0b810608 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -173,7 +173,8 @@ var (
Subsystem: "s3",
Name: "request_total",
Help: "Counter of s3 requests.",
- }, []string{"type", "code"})
+ }, []string{"type", "code", "bucket"})
+
S3RequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "SeaweedFS",
@@ -181,7 +182,7 @@ var (
Name: "request_seconds",
Help: "Bucketed histogram of s3 request processing time.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
- }, []string{"type"})
+ }, []string{"type", "bucket"})
)
func init() {
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 289fd3b47..847324838 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -44,13 +44,11 @@ func GenerateDirUuid(dir string) (dirUuidString string, err error) {
dirUuidString = dirUuid.String()
writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
if writeErr != nil {
- glog.Warningf("failed to write uuid to %s : %v", fileName, writeErr)
return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
}
} else {
uuidData, readErr := os.ReadFile(fileName)
if readErr != nil {
- glog.Warningf("failed to read uuid from %s : %v", fileName, readErr)
return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
}
dirUuidString = string(uuidData)
@@ -65,7 +63,10 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSp
} else {
idxDir = util.ResolvePath(idxDir)
}
- dirUuid, _ := GenerateDirUuid(dir)
+ dirUuid, err := GenerateDirUuid(dir)
+ if err != nil {
+ glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
+ }
location := &DiskLocation{
Directory: dir,
DirectoryUuid: dirUuid,
diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go
index 3d2047f99..ccce8f108 100644
--- a/weed/storage/needle_map/compact_map.go
+++ b/weed/storage/needle_map/compact_map.go
@@ -8,7 +8,7 @@ import (
)
const (
- batch = 100000
+ batch = 10000
)
type SectionalNeedleId uint32
diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go
index 199cb26b3..afe12ee72 100644
--- a/weed/storage/needle_map/compact_map_test.go
+++ b/weed/storage/needle_map/compact_map_test.go
@@ -2,10 +2,25 @@ package needle_map
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/sequence"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"testing"
)
+func TestSnowflakeSequencer(t *testing.T) {
+ m := NewCompactMap()
+ seq, _ := sequence.NewSnowflakeSequencer("for_test", 1)
+
+ for i := 0; i < 200000; i++ {
+ id := seq.NextFileId(1)
+ oldOffset, oldSize := m.Set(NeedleId(id), ToOffset(8), 3000073)
+ if oldSize != 0 {
+ t.Errorf("id %d oldOffset %v oldSize %d", id, oldOffset, oldSize)
+ }
+ }
+
+}
+
func TestOverflow2(t *testing.T) {
m := NewCompactMap()
_, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 147220f4a..e53aa2853 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -22,7 +22,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
errCount := int32(0)
for index, dn := range locationlist.list {
go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
- err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: uint32(vid),
})
@@ -123,7 +123,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
isReadOnly := false
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
@@ -150,7 +150,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: uint32(vid),
})
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 7886c3998..238ca99f4 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -3,6 +3,7 @@ package topology
import (
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand"
"sync"
@@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) {
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) {
if targetCount == 0 {
targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
}
- count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
- if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
- return count, nil
+ result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
+ if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
+ return result, nil
}
- return count, err
+ return result, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
- counter += c
+ if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
+ result = append(result, res...)
} else {
- glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e)
- return counter, e
+ glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
+ return result, e
}
}
return
}
-func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) {
+func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
- return 0, e
+ return nil, e
}
vid, raftErr := topo.NextVolumeId()
if raftErr != nil {
- return 0, raftErr
+ return nil, raftErr
}
- err := vg.grow(grpcDialOption, topo, vid, option, servers...)
- return len(servers), err
+ if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil {
+ for _, server := range servers {
+ result = append(result, &master_pb.VolumeLocation{
+ Url: server.Url(),
+ PublicUrl: server.PublicUrl,
+ NewVids: []uint32{uint32(vid)},
+ })
+ }
+ }
+ return
}
// 1. find the main data node
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 167aee8ea..dee82762a 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -493,9 +493,9 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
for vid, vll := range vl.vid2location {
size, fileCount := vll.Stats(vid, freshThreshold)
ret.FileCount += uint64(fileCount)
- ret.UsedSize += size
+ ret.UsedSize += size * uint64(vll.Length())
if vl.readonlyVolumes.IsTrue(vid) {
- ret.TotalSize += size
+ ret.TotalSize += size * uint64(vll.Length())
} else {
ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length())
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index c0fea8b17..ce8757ce9 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION_NUMBER = fmt.Sprintf("%.02f", 3.11)
+ VERSION_NUMBER = fmt.Sprintf("%.02f", 3.13)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 244a3921a..35f1c4cf8 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -38,6 +38,39 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy
}
}
+func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return mc.LookupFileIdWithFallback
+}
+
+func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
+ fullUrls, err = mc.vidMap.LookupFileId(fileId)
+ if err == nil {
+ return
+ }
+ err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: []string{fileId},
+ })
+ if err != nil {
+ return err
+ }
+ for vid, vidLocation := range resp.VolumeIdLocations {
+ for _, vidLoc := range vidLocation.Locations {
+ loc := Location{
+ Url: vidLoc.Url,
+ PublicUrl: vidLoc.PublicUrl,
+ GrpcPort: int(vidLoc.GrpcPort),
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId)
+ }
+ }
+
+ return nil
+ })
+ return
+}
+
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
return mc.currentMaster
@@ -98,7 +131,6 @@ func (mc *MasterClient) tryAllMasters() {
}
mc.currentMaster = ""
- mc.vidMap = newVidMap("")
}
}
@@ -126,9 +158,25 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
return err
}
-
glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
+
+ resp, err := stream.Recv()
+ if err != nil {
+ glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
+ return err
+ }
+
+ // check if it is the leader to determine whether to reset the vidMap
+ if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
+ nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
+ return nil
+ }
+
mc.currentMaster = master
+ mc.vidMap = newVidMap("")
for {
resp, err := stream.Recv()
@@ -140,8 +188,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" {
- glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
return nil
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index cdd783d91..754c77051 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -90,10 +90,6 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return
}
-func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType {
- return vc.LookupFileId
-}
-
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
@@ -133,7 +129,7 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
return
}
locations, found = vc.ecVid2Locations[vid]
- return
+ return locations, found && len(locations) > 0
}
func (vc *vidMap) addLocation(vid uint32, location Location) {