aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/cluster/cluster.go10
-rw-r--r--weed/cluster/cluster_test.go34
-rw-r--r--weed/command/filer.go26
-rw-r--r--weed/command/filer_sync.go4
-rw-r--r--weed/command/imports.go2
-rw-r--r--weed/command/master.go5
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/scaffold/filer.toml10
-rw-r--r--weed/command/update_full.go4
-rw-r--r--weed/filer/tikv/tikv.go6
-rw-r--r--weed/filer/tikv/tikv_store.go396
-rw-r--r--weed/filer/tikv/tikv_store_kv.go50
-rw-r--r--weed/mount/inode_to_path.go15
-rw-r--r--weed/mount/weedfs_dir_mkrm.go2
-rw-r--r--weed/pb/filer_pb/signature.go13
-rw-r--r--weed/pb/grpc_client_server.go5
-rw-r--r--weed/pb/remote.proto5
-rw-r--r--weed/remote_storage/hdfs/doc.go9
-rw-r--r--weed/remote_storage/hdfs/hdfs_kerberos.go58
-rw-r--r--weed/remote_storage/hdfs/hdfs_storage_client.go194
-rw-r--r--weed/s3api/s3api_circuit_breaker.go19
-rw-r--r--weed/s3api/s3api_circuit_breaker_test.go38
-rw-r--r--weed/s3api/s3api_object_copy_handlers.go8
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go4
-rw-r--r--weed/security/tls.go140
-rw-r--r--weed/sequence/snowflake_sequencer_test.go25
-rw-r--r--weed/server/master_grpc_server.go8
-rw-r--r--weed/server/master_grpc_server_volume.go10
-rw-r--r--weed/server/master_server_handlers_admin.go5
-rw-r--r--weed/server/raft_hashicorp.go5
-rw-r--r--weed/server/raft_server.go2
-rw-r--r--weed/server/volume_grpc_copy.go24
-rw-r--r--weed/server/webdav_server.go8
-rw-r--r--weed/shell/command_remote_configure.go21
-rw-r--r--weed/storage/disk_location.go7
-rw-r--r--weed/storage/needle_map/compact_map_test.go15
-rw-r--r--weed/topology/topology_vacuum.go6
-rw-r--r--weed/topology/volume_growth.go39
-rw-r--r--weed/topology/volume_layout.go4
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/wdclient/masterclient.go56
-rw-r--r--weed/wdclient/vid_map.go4
42 files changed, 850 insertions, 450 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index 6c24df44c..ad6e6b879 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -46,8 +46,6 @@ func NewCluster() *Cluster {
}
func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers {
- cluster.filersLock.Lock()
- defer cluster.filersLock.Unlock()
filers, found := cluster.filerGroup2filers[filerGroup]
if !found && createIfNotFound {
filers = &Filers{
@@ -63,6 +61,8 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
+ cluster.filersLock.Lock()
+ defer cluster.filersLock.Unlock()
filers := cluster.getFilers(filerGroup, true)
if existingNode, found := filers.filers[address]; found {
existingNode.counter++
@@ -115,6 +115,8 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
+ cluster.filersLock.Lock()
+ defer cluster.filersLock.Unlock()
filers := cluster.getFilers(filerGroup, false)
if filers == nil {
return nil
@@ -165,12 +167,12 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) {
switch nodeType {
case FilerType:
+ cluster.filersLock.RLock()
+ defer cluster.filersLock.RUnlock()
filers := cluster.getFilers(filerGroup, false)
if filers == nil {
return
}
- cluster.filersLock.RLock()
- defer cluster.filersLock.RUnlock()
for _, node := range filers.filers {
nodes = append(nodes, node)
}
diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go
index ccaccf6f7..1187642de 100644
--- a/weed/cluster/cluster_test.go
+++ b/weed/cluster/cluster_test.go
@@ -3,6 +3,8 @@ package cluster
import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/stretchr/testify/assert"
+ "strconv"
+ "sync"
"testing"
)
@@ -45,3 +47,35 @@ func TestClusterAddRemoveNodes(t *testing.T) {
c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
}
+
+func TestConcurrentAddRemoveNodes(t *testing.T) {
+ c := NewCluster()
+ var wg sync.WaitGroup
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ address := strconv.Itoa(i)
+ c.AddClusterNode("", "filer", pb.ServerAddress(address), "23.45")
+ }(i)
+ }
+ wg.Wait()
+
+ for i := 0; i < 50; i++ {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ address := strconv.Itoa(i)
+ node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address))
+
+ if len(node) == 0 {
+ t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address)
+ return
+ } else if node[0].ClusterNodeUpdate.Address != address {
+ t.Errorf("TestConcurrentAddRemoveNodes: expect:%s, actual:%s", address, node[0].ClusterNodeUpdate.Address)
+ return
+ }
+ }(i)
+ }
+ wg.Wait()
+}
diff --git a/weed/command/filer.go b/weed/command/filer.go
index c9f9a1956..7e0e92d4a 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -6,10 +6,13 @@ import (
"net/http"
"os"
"runtime"
+ "sort"
+ "strings"
"time"
"google.golang.org/grpc/reflection"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -114,10 +117,8 @@ func init() {
filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
}
-var cmdFiler = &Command{
- UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
- Short: "start a file server that points to a master server, or a list of master servers",
- Long: `start a file server which accepts REST operation for any files.
+func filerLongDesc() string {
+ desc := `start a file server which accepts REST operation for any files.
//create or overwrite the file, the directories /path/to will be automatically created
POST /path/to/file
@@ -133,7 +134,22 @@ var cmdFiler = &Command{
The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
-`,
+Supported Filer Stores:
+`
+
+ storeNames := make([]string, len(filer.Stores))
+ for i, store := range filer.Stores {
+ storeNames[i] = "\t" + store.GetName()
+ }
+ sort.Strings(storeNames)
+ storeList := strings.Join(storeNames, "\n")
+ return desc + storeList
+}
+
+var cmdFiler = &Command{
+ UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
+ Short: "start a file server that points to a master server, or a list of master servers",
+ Long: filerLongDesc(),
}
func runFiler(cmd *Command, args []string) bool {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index b7da1baf9..1550d155a 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -215,10 +215,10 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
return persistEventFn(resp)
}
- var lastLogTsNs = time.Now().Nanosecond()
+ var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
- now := time.Now().Nanosecond()
+ now := time.Now().UnixNano()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
// collect synchronous offset
diff --git a/weed/command/imports.go b/weed/command/imports.go
index 04079b162..afdbc5a10 100644
--- a/weed/command/imports.go
+++ b/weed/command/imports.go
@@ -5,7 +5,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs"
- _ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
@@ -32,5 +31,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
_ "github.com/chrislusf/seaweedfs/weed/filer/ydb"
)
diff --git a/weed/command/master.go b/weed/command/master.go
index 9587df055..ab8466d47 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,9 +1,11 @@
package command
import (
+ "fmt"
"golang.org/x/exp/slices"
"net/http"
"os"
+ "path"
"strings"
"time"
@@ -151,11 +153,12 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}
// start raftServer
+ metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port))
raftServerOption := &weed_server.RaftServerOption{
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
Peers: masterPeers,
ServerAddr: myMasterAddress,
- DataDir: util.ResolvePath(*masterOption.metaFolder),
+ DataDir: util.ResolvePath(metaDir),
Topo: ms.Topo,
RaftResumeState: *masterOption.raftResumeState,
HeartbeatInterval: *masterOption.heartbeatInterval,
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 0e32a53e8..0046ca03d 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -86,7 +86,7 @@ var cmdMount = &Command{
This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on
Linux, and OS X.
- On OS X, it requires OSXFUSE (http://osxfuse.github.com/).
+ On OS X, it requires OSXFUSE (https://osxfuse.github.io/).
`,
}
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 595fb2e62..c82de8da0 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -327,3 +327,13 @@ location = "/tmp/"
address = "localhost:6379"
password = ""
database = 1
+
+[tikv]
+enabled = false
+# If you have many pd address, use ',' split then:
+# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379"
+pdaddrs = "localhost:2379"
+# Concurrency for TiKV delete range
+deleterange_concurrency = 1
+# Enable 1PC
+enable_1pc = false
diff --git a/weed/command/update_full.go b/weed/command/update_full.go
index 529f38219..185203aee 100644
--- a/weed/command/update_full.go
+++ b/weed/command/update_full.go
@@ -1,5 +1,5 @@
-//go:build elastic && ydb && gocdk && hdfs
-// +build elastic,ydb,gocdk,hdfs
+//go:build elastic && ydb && gocdk && tikv
+// +build elastic,ydb,gocdk,tikv
package command
diff --git a/weed/filer/tikv/tikv.go b/weed/filer/tikv/tikv.go
new file mode 100644
index 000000000..ba1da27a8
--- /dev/null
+++ b/weed/filer/tikv/tikv.go
@@ -0,0 +1,6 @@
+/*
+ * Package tikv is for TiKV filer store.
+ * This empty file is let go build can work without tikv tag
+ * Using "make full_install" to enable TiKV filer store.
+ */
+package tikv
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
new file mode 100644
index 000000000..f8932663d
--- /dev/null
+++ b/weed/filer/tikv/tikv_store.go
@@ -0,0 +1,396 @@
+//go:build tikv
+// +build tikv
+
+package tikv
+
+import (
+ "bytes"
+ "context"
+ "crypto/sha1"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+var (
+ _ filer.FilerStore = ((*TikvStore)(nil))
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &TikvStore{})
+}
+
+type TikvStore struct {
+ client *txnkv.Client
+ deleteRangeConcurrency int
+ onePC bool
+}
+
+// Basic APIs
+func (store *TikvStore) GetName() string {
+ return "tikv"
+}
+
+func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
+ pdAddrs := []string{}
+ pdAddrsStr := config.GetString(prefix + "pdaddrs")
+ for _, item := range strings.Split(pdAddrsStr, ",") {
+ pdAddrs = append(pdAddrs, strings.TrimSpace(item))
+ }
+ drc := config.GetInt(prefix + "deleterange_concurrency")
+ if drc <= 0 {
+ drc = 1
+ }
+ store.onePC = config.GetBool(prefix + "enable_1pc")
+ store.deleteRangeConcurrency = drc
+ return store.initialize(pdAddrs)
+}
+
+func (store *TikvStore) initialize(pdAddrs []string) error {
+ client, err := txnkv.NewClient(pdAddrs)
+ store.client = client
+ return err
+}
+
+func (store *TikvStore) Shutdown() {
+ err := store.client.Close()
+ if err != nil {
+ glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
+ }
+}
+
+// ~ Basic APIs
+
+// Entry APIs
+func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ dir, name := entry.DirAndName()
+ key := generateKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+ return nil
+}
+
+func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var value []byte = nil
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ value = val
+ }
+ return err
+ })
+
+ if isNotExists(err) || value == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", path, err)
+ }
+
+ entry := &filer.Entry{
+ FullPath: path,
+ }
+ err = entry.DecodeAttributesAndChunks(value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+ return entry, nil
+}
+
+func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ return nil
+}
+
+// ~ Entry APIs
+
+// Directory APIs
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
+ directoryPrefix := genDirectoryKeyPrefix(path, "")
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ var (
+ startKey []byte = nil
+ endKey []byte = nil
+ )
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(directoryPrefix, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ endKey = key
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ if startKey == nil {
+ startKey = key
+ }
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ // Only one Key matched just delete it.
+ if startKey != nil && bytes.Equal(startKey, endKey) {
+ return txn.Delete(startKey)
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+
+ if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
+ // has startKey and endKey and they are not equals, so use delete range
+ _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ }
+ return err
+}
+
+func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ lastFileName := ""
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return lastFileName, err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(lastFileStart, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ i := int64(0)
+ first := true
+ for iter.Valid() {
+ if first {
+ first = false
+ if !includeStartFile {
+ if iter.Valid() {
+ // Check first item is lastFileStart
+ if bytes.Equal(iter.Key(), lastFileStart) {
+ // Is lastFileStart and not include start file, just
+ // ignore it.
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ continue
+ }
+ }
+ }
+ }
+ // Check for limitation
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+ // Validate key prefix
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ value := iter.Value()
+
+ // Start process
+ fileName := getNameFromKey(key)
+ if fileName != "" {
+ // Got file name, then generate the Entry
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), fileName),
+ }
+ // Update lastFileName
+ lastFileName = fileName
+ // Check for decode value.
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ // Got error just return the error
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return err
+ }
+ // Run for each callback if return false just break the iteration
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ // End process
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+ return lastFileName, nil
+}
+
+// ~ Directory APIs
+
+// Transaction Related APIs
+func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.client.Begin()
+ if err != nil {
+ return ctx, err
+ }
+ if store.onePC {
+ tx.SetEnable1PC(store.onePC)
+ }
+ return context.WithValue(ctx, "tx", tx), nil
+}
+
+func (store *TikvStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Commit(context.Background())
+ }
+ return nil
+}
+
+func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+// ~ Transaction Related APIs
+
+// Transaction Wrapper
+type TxnWrapper struct {
+ *txnkv.KVTxn
+ inContext bool
+}
+
+func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
+ err := f(w.KVTxn)
+ if !w.inContext {
+ if err != nil {
+ w.KVTxn.Rollback()
+ return err
+ }
+ w.KVTxn.Commit(context.Background())
+ return nil
+ }
+ return err
+}
+
+func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return &TxnWrapper{tx, true}, nil
+ }
+ txn, err := store.client.Begin()
+ if err != nil {
+ return nil, err
+ }
+ if store.onePC {
+ txn.SetEnable1PC(store.onePC)
+ }
+ return &TxnWrapper{txn, false}, nil
+}
+
+// ~ Transaction Wrapper
+
+// Encoding Functions
+func hashToBytes(dir string) []byte {
+ h := sha1.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return b
+}
+
+func generateKey(dirPath, fileName string) []byte {
+ key := hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func getNameFromKey(key []byte) string {
+ return string(key[sha1.Size:])
+}
+
+func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func isNotExists(err error) bool {
+ if err == nil {
+ return false
+ }
+ if err.Error() == "not exist" {
+ return true
+ }
+ return false
+}
+
+// ~ Encoding Functions
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
new file mode 100644
index 000000000..1d9428c69
--- /dev/null
+++ b/weed/filer/tikv/tikv_store_kv.go
@@ -0,0 +1,50 @@
+//go:build tikv
+// +build tikv
+
+package tikv
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+}
+
+func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var data []byte = nil
+ err = tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ data = val
+ }
+ return err
+ })
+ if isNotExists(err) {
+ return data, filer.ErrKvNotFound
+ }
+ return data, err
+}
+
+func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+}
diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go
index e465158e8..fa17a9261 100644
--- a/weed/mount/inode_to_path.go
+++ b/weed/mount/inode_to_path.go
@@ -157,6 +157,10 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (replacedIn
defer i.Unlock()
sourceInode, sourceFound := i.path2inode[sourcePath]
targetInode, targetFound := i.path2inode[targetPath]
+ if targetFound {
+ delete(i.inode2path, targetInode)
+ delete(i.path2inode, targetPath)
+ }
if sourceFound {
delete(i.path2inode, sourcePath)
i.path2inode[targetPath] = sourceInode
@@ -165,11 +169,14 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (replacedIn
// so no need to worry about their source inodes
return
}
- i.inode2path[sourceInode].FullPath = targetPath
- if targetFound {
- delete(i.inode2path, targetInode)
+ if entry, entryFound := i.inode2path[sourceInode]; entryFound {
+ entry.FullPath = targetPath
+ entry.isChildrenCached = false
+ if !targetFound {
+ entry.nlookup++
+ }
} else {
- i.inode2path[sourceInode].nlookup++
+ glog.Errorf("MovePath %s to %s: sourceInode %d not found", sourcePath, targetPath, sourceInode)
}
return targetInode
}
diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go
index 289a0bc2c..4246f0a4c 100644
--- a/weed/mount/weedfs_dir_mkrm.go
+++ b/weed/mount/weedfs_dir_mkrm.go
@@ -104,7 +104,7 @@ func (wfs *WFS) Rmdir(cancel <-chan struct{}, header *fuse.InHeader, name string
glog.V(3).Infof("remove directory: %v", entryFullPath)
ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
- err := filer_pb.Remove(wfs, string(dirFullPath), name, true, true, ignoreRecursiveErr, false, []int32{wfs.signature})
+ err := filer_pb.Remove(wfs, string(dirFullPath), name, true, false, ignoreRecursiveErr, false, []int32{wfs.signature})
if err != nil {
glog.V(0).Infof("remove %s: %v", entryFullPath, err)
if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
diff --git a/weed/pb/filer_pb/signature.go b/weed/pb/filer_pb/signature.go
deleted file mode 100644
index e13afc656..000000000
--- a/weed/pb/filer_pb/signature.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package filer_pb
-
-func (r *CreateEntryRequest) AddSignature(sig int32) {
- r.Signatures = append(r.Signatures, sig)
-}
-func (r *CreateEntryRequest) HasSigned(sig int32) bool {
- for _, s := range r.Signatures {
- if s == sig {
- return true
- }
- }
- return false
-}
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index d89e61433..990cf74f9 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -46,8 +46,9 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
var options []grpc.ServerOption
options = append(options,
grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: 10 * time.Second, // wait time before ping if no activity
- Timeout: 20 * time.Second, // ping timeout
+ Time: 10 * time.Second, // wait time before ping if no activity
+ Timeout: 20 * time.Second, // ping timeout
+ MaxConnectionAge: 10 * time.Hour,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 60 * time.Second, // min time a client should wait before sending a ping
diff --git a/weed/pb/remote.proto b/weed/pb/remote.proto
index bdecf4dbe..13f7a878b 100644
--- a/weed/pb/remote.proto
+++ b/weed/pb/remote.proto
@@ -49,11 +49,6 @@ message RemoteConf {
string wasabi_endpoint = 42;
string wasabi_region = 43;
- repeated string hdfs_namenodes = 50;
- string hdfs_username = 51;
- string hdfs_service_principal_name = 52;
- string hdfs_data_transfer_protection = 53;
-
string filebase_access_key = 60;
string filebase_secret_key = 61;
string filebase_endpoint = 62;
diff --git a/weed/remote_storage/hdfs/doc.go b/weed/remote_storage/hdfs/doc.go
deleted file mode 100644
index 086c9de3f..000000000
--- a/weed/remote_storage/hdfs/doc.go
+++ /dev/null
@@ -1,9 +0,0 @@
-/*
-
-Package hdfs is for remote hdfs storage.
-
-The referenced "github.com/colinmarc/hdfs/v2" library is too big when compiled.
-So this is only compiled in "make full_install".
-
-*/
-package hdfs
diff --git a/weed/remote_storage/hdfs/hdfs_kerberos.go b/weed/remote_storage/hdfs/hdfs_kerberos.go
deleted file mode 100644
index ba152020a..000000000
--- a/weed/remote_storage/hdfs/hdfs_kerberos.go
+++ /dev/null
@@ -1,58 +0,0 @@
-//go:build hdfs
-// +build hdfs
-
-package hdfs
-
-import (
- "fmt"
- "os"
- "os/user"
- "strings"
-
- krb "github.com/jcmturner/gokrb5/v8/client"
- "github.com/jcmturner/gokrb5/v8/config"
- "github.com/jcmturner/gokrb5/v8/credentials"
-)
-
-// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go
-func getKerberosClient() (*krb.Client, error) {
- configPath := os.Getenv("KRB5_CONFIG")
- if configPath == "" {
- configPath = "/etc/krb5.conf"
- }
-
- cfg, err := config.Load(configPath)
- if err != nil {
- return nil, err
- }
-
- // Determine the ccache location from the environment, falling back to the
- // default location.
- ccachePath := os.Getenv("KRB5CCNAME")
- if strings.Contains(ccachePath, ":") {
- if strings.HasPrefix(ccachePath, "FILE:") {
- ccachePath = strings.SplitN(ccachePath, ":", 2)[1]
- } else {
- return nil, fmt.Errorf("unusable ccache: %s", ccachePath)
- }
- } else if ccachePath == "" {
- u, err := user.Current()
- if err != nil {
- return nil, err
- }
-
- ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
- }
-
- ccache, err := credentials.LoadCCache(ccachePath)
- if err != nil {
- return nil, err
- }
-
- client, err := krb.NewFromCCache(ccache, cfg)
- if err != nil {
- return nil, err
- }
-
- return client, nil
-}
diff --git a/weed/remote_storage/hdfs/hdfs_storage_client.go b/weed/remote_storage/hdfs/hdfs_storage_client.go
deleted file mode 100644
index 3b71958fd..000000000
--- a/weed/remote_storage/hdfs/hdfs_storage_client.go
+++ /dev/null
@@ -1,194 +0,0 @@
-//go:build hdfs
-// +build hdfs
-
-package hdfs
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
- "github.com/chrislusf/seaweedfs/weed/remote_storage"
- "github.com/chrislusf/seaweedfs/weed/util"
- hdfs "github.com/colinmarc/hdfs/v2"
- "io"
- "os"
- "path"
-)
-
-func init() {
- remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker)
-}
-
-type hdfsRemoteStorageMaker struct{}
-
-func (s hdfsRemoteStorageMaker) HasBucket() bool {
- return false
-}
-
-func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
- client := &hdfsRemoteStorageClient{
- conf: conf,
- }
-
- options := hdfs.ClientOptions{
- Addresses: conf.HdfsNamenodes,
- UseDatanodeHostname: false,
- }
-
- if conf.HdfsServicePrincipalName != "" {
- var err error
- options.KerberosClient, err = getKerberosClient()
- if err != nil {
- return nil, fmt.Errorf("get kerberos authentication: %s", err)
- }
- options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName
-
- if conf.HdfsDataTransferProtection != "" {
- options.DataTransferProtection = conf.HdfsDataTransferProtection
- }
- } else {
- options.User = conf.HdfsUsername
- }
-
- c, err := hdfs.NewClient(options)
- if err != nil {
- return nil, err
- }
-
- client.client = c
- return client, nil
-}
-
-type hdfsRemoteStorageClient struct {
- conf *remote_pb.RemoteConf
- client *hdfs.Client
-}
-
-var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
-
-func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
-
- return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
- children, err := c.client.ReadDir(string(parentDir))
- if err != nil {
- return err
- }
- for _, child := range children {
- if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{
- StorageName: c.conf.Name,
- LastLocalSyncTsNs: 0,
- RemoteETag: "",
- RemoteMtime: child.ModTime().Unix(),
- RemoteSize: child.Size(),
- }); err != nil {
- return nil
- }
- }
- return nil
- }, util.FullPath(loc.Path), visitFn)
-
-}
-func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
-
- f, err := c.client.Open(loc.Path)
- if err != nil {
- return
- }
- defer f.Close()
- data = make([]byte, size)
- _, err = f.ReadAt(data, offset)
-
- return
-
-}
-
-func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
- return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode))
-}
-
-func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
- return c.client.RemoveAll(loc.Path)
-}
-
-func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
-
- dirname := path.Dir(loc.Path)
-
- // ensure parent directory
- if err = c.client.MkdirAll(dirname, 0755); err != nil {
- return
- }
-
- // remove existing file
- info, err := c.client.Stat(loc.Path)
- if err == nil {
- err = c.client.Remove(loc.Path)
- if err != nil {
- return
- }
- }
-
- // create new file
- out, err := c.client.Create(loc.Path)
- if err != nil {
- return
- }
-
- cleanup := func() {
- if removeErr := c.client.Remove(loc.Path); removeErr != nil {
- glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr)
- }
- }
-
- if _, err = io.Copy(out, reader); err != nil {
- cleanup()
- return
- }
-
- if err = out.Close(); err != nil {
- cleanup()
- return
- }
-
- info, err = c.client.Stat(loc.Path)
- if err != nil {
- return
- }
-
- return &filer_pb.RemoteEntry{
- RemoteMtime: info.ModTime().Unix(),
- RemoteSize: info.Size(),
- RemoteETag: "",
- StorageName: c.conf.Name,
- }, nil
-
-}
-
-func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
- if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode {
- if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil {
- return err
- }
- }
- return nil
-}
-
-func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
- if err = c.client.Remove(loc.Path); err != nil {
- return fmt.Errorf("hdfs delete %s: %v", loc.Path, err)
- }
- return
-}
-
-func (c *hdfsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
- return
-}
-
-func (c *hdfsRemoteStorageClient) CreateBucket(name string) (err error) {
- return
-}
-
-func (c *hdfsRemoteStorageClient) DeleteBucket(name string) (err error) {
- return
-}
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
index 7c8311d21..68fb0a5d2 100644
--- a/weed/s3api/s3api_circuit_breaker.go
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -16,7 +16,7 @@ import (
)
type CircuitBreaker struct {
- sync.Mutex
+ sync.RWMutex
Enabled bool
counters map[string]*int64
limitations map[string]int64
@@ -37,7 +37,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
})
if err != nil {
- glog.Warningf("fail to load config: %v", err)
+ glog.Infof("s3 circuit breaker not configured: %v", err)
}
return cb
@@ -110,7 +110,7 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request),
func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) {
//bucket simultaneous request count
- bucketCountRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest)
+ bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
if bucketCountRollBack != nil {
rollback = append(rollback, bucketCountRollBack)
}
@@ -119,7 +119,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
}
//bucket simultaneous request content bytes
- bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed)
+ bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
if bucketContentLengthRollBack != nil {
rollback = append(rollback, bucketContentLengthRollBack)
}
@@ -128,7 +128,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
}
//global simultaneous request count
- globalCountRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest)
+ globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
if globalCountRollBack != nil {
rollback = append(rollback, globalCountRollBack)
}
@@ -137,7 +137,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
}
//global simultaneous request content bytes
- globalContentLengthRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed)
+ globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
if globalContentLengthRollBack != nil {
rollback = append(rollback, globalContentLengthRollBack)
}
@@ -147,11 +147,13 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (
return
}
-func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) {
- key := s3_constants.Concat(bucket, action, limitType)
+func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) {
e = s3err.ErrNone
if max, ok := cb.limitations[key]; ok {
+ cb.RLock()
counter, exists := cb.counters[key]
+ cb.RUnlock()
+
if !exists {
cb.Lock()
counter, exists = cb.counters[key]
@@ -171,7 +173,6 @@ func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string
f = func() {
atomic.AddInt64(counter, -inc)
}
- current = atomic.LoadInt64(counter)
if current > max {
e = errCode
return
diff --git a/weed/s3api/s3api_circuit_breaker_test.go b/weed/s3api/s3api_circuit_breaker_test.go
index f795b75fc..5848cf164 100644
--- a/weed/s3api/s3api_circuit_breaker_test.go
+++ b/weed/s3api/s3api_circuit_breaker_test.go
@@ -11,28 +11,38 @@ import (
)
type TestLimitCase struct {
- actionName string
+ actionName string
+
limitType string
bucketLimitValue int64
globalLimitValue int64
routineCount int
- reqBytes int64
-
successCount int64
}
var (
bucket = "/test"
- action = s3_constants.ACTION_READ
+ action = s3_constants.ACTION_WRITE
+ fileSize int64 = 200
+
TestLimitCases = []*TestLimitCase{
- {action, s3_constants.LimitTypeCount, 5, 5, 6, 1024, 5},
- {action, s3_constants.LimitTypeCount, 6, 6, 6, 1024, 6},
- {action, s3_constants.LimitTypeCount, 5, 6, 6, 1024, 5},
- {action, s3_constants.LimitTypeBytes, 1024, 1024, 6, 200, 5},
- {action, s3_constants.LimitTypeBytes, 1200, 1200, 6, 200, 6},
- {action, s3_constants.LimitTypeBytes, 11990, 11990, 60, 200, 59},
- {action, s3_constants.LimitTypeBytes, 11790, 11990, 70, 200, 58},
+
+ //bucket-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 5, 6, 60, 5},
+ {action, s3_constants.LimitTypeCount, 0, 6, 6, 0},
+
+ //global-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 6, 5, 6, 5},
+ {action, s3_constants.LimitTypeCount, 6, 0, 6, 0},
+
+ //bucket-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0},
+
+ //global-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0},
}
)
@@ -64,14 +74,14 @@ func TestLimit(t *testing.T) {
t.Fatal(err)
}
- successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: tc.reqBytes})
+ successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName)
if successCount != tc.successCount {
- t.Errorf("successCount not equal, expect=%d, actual=%d", tc.successCount, successCount)
+ t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc)
}
}
}
-func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request) int64 {
+func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 {
var successCounter int64
resultCh := make(chan []func(), routineCount)
var wg sync.WaitGroup
diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go
index 370b5a911..950e7a8fb 100644
--- a/weed/s3api/s3api_object_copy_handlers.go
+++ b/weed/s3api/s3api_object_copy_handlers.go
@@ -80,8 +80,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
return
}
- dstUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject), dstBucket)
+ dstUrl := fmt.Sprintf("http://%s%s/%s%s",
+ s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject))
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
@@ -169,8 +169,8 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
rangeHeader := r.Header.Get("x-amz-copy-source-range")
- dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket)
+ dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index 768f4d180..d2ff87832 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -247,8 +247,8 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
- uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, bucket)
+ uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
if partID == 1 && r.Header.Get("Content-Type") == "" {
dataReader = mimeDetect(r, dataReader)
diff --git a/weed/security/tls.go b/weed/security/tls.go
index 79552c026..bfa9d43c7 100644
--- a/weed/security/tls.go
+++ b/weed/security/tls.go
@@ -1,24 +1,22 @@
package security
import (
- "context"
"crypto/tls"
"crypto/x509"
+ "fmt"
+ "google.golang.org/grpc/credentials/tls/certprovider/pemfile"
+ "google.golang.org/grpc/security/advancedtls"
"io/ioutil"
- "os"
"strings"
-
- grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/peer"
- "google.golang.org/grpc/status"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
)
+const credRefreshingInterval = time.Duration(5) * time.Hour
+
type Authenticator struct {
AllowedWildcardDomain string
AllowedCommonNames map[string]bool
@@ -29,28 +27,39 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption
return nil, nil
}
- // load cert/key, ca cert
- cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key"))
+ serverOptions := pemfile.Options{
+ CertFile: config.GetString(component + ".cert"),
+ KeyFile: config.GetString(component + ".key"),
+ RefreshDuration: credRefreshingInterval,
+ }
+
+ serverIdentityProvider, err := pemfile.NewProvider(serverOptions)
if err != nil {
- glog.V(1).Infof("load cert: %s / key: %s error: %v",
- config.GetString(component+".cert"),
- config.GetString(component+".key"),
- err)
+ glog.Warningf("pemfile.NewProvider(%v) %v failed: %v", serverOptions, component, err)
return nil, nil
}
- caCert, err := os.ReadFile(config.GetString("grpc.ca"))
+
+ serverRootOptions := pemfile.Options{
+ RootFile: config.GetString("grpc.ca"),
+ RefreshDuration: credRefreshingInterval,
+ }
+ serverRootProvider, err := pemfile.NewProvider(serverRootOptions)
if err != nil {
- glog.V(1).Infof("read ca cert file %s error: %v", config.GetString("grpc.ca"), err)
+ glog.Warningf("pemfile.NewProvider(%v) failed: %v", serverRootOptions, err)
return nil, nil
}
- caCertPool := x509.NewCertPool()
- caCertPool.AppendCertsFromPEM(caCert)
- ta := credentials.NewTLS(&tls.Config{
- Certificates: []tls.Certificate{cert},
- ClientCAs: caCertPool,
- ClientAuth: tls.RequireAndVerifyClientCert,
- })
+ // Start a server and create a client using advancedtls API with Provider.
+ options := &advancedtls.ServerOptions{
+ IdentityOptions: advancedtls.IdentityCertificateOptions{
+ IdentityProvider: serverIdentityProvider,
+ },
+ RootOptions: advancedtls.RootCertificateOptions{
+ RootProvider: serverRootProvider,
+ },
+ RequireClientCert: true,
+ VType: advancedtls.CertVerification,
+ }
allowedCommonNames := config.GetString(component + ".allowed_commonNames")
allowedWildcardDomain := config.GetString("grpc.allowed_wildcard_domain")
if allowedCommonNames != "" || allowedWildcardDomain != "" {
@@ -62,7 +71,16 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption
AllowedCommonNames: allowedCommonNamesMap,
AllowedWildcardDomain: allowedWildcardDomain,
}
- return grpc.Creds(ta), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(auther.Authenticate))
+ options.VerifyPeer = auther.Authenticate
+ } else {
+ options.VerifyPeer = func(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) {
+ return &advancedtls.VerificationResults{}, nil
+ }
+ }
+ ta, err := advancedtls.NewServerCreds(options)
+ if err != nil {
+ glog.Warningf("advancedtls.NewServerCreds(%v) failed: %v", options, err)
+ return nil, nil
}
return grpc.Creds(ta), nil
}
@@ -77,25 +95,42 @@ func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption {
return grpc.WithInsecure()
}
- // load cert/key, cacert
- cert, err := tls.LoadX509KeyPair(certFileName, keyFileName)
+ clientOptions := pemfile.Options{
+ CertFile: certFileName,
+ KeyFile: keyFileName,
+ RefreshDuration: credRefreshingInterval,
+ }
+ clientProvider, err := pemfile.NewProvider(clientOptions)
if err != nil {
- glog.V(1).Infof("load cert/key error: %v", err)
+ glog.Warningf("pemfile.NewProvider(%v) failed %v", clientOptions, err)
return grpc.WithInsecure()
}
- caCert, err := os.ReadFile(caFileName)
+ clientRootOptions := pemfile.Options{
+ RootFile: config.GetString("grpc.ca"),
+ RefreshDuration: credRefreshingInterval,
+ }
+ clientRootProvider, err := pemfile.NewProvider(clientRootOptions)
if err != nil {
- glog.V(1).Infof("read ca cert file error: %v", err)
+ glog.Warningf("pemfile.NewProvider(%v) failed: %v", clientRootOptions, err)
+ return grpc.WithInsecure()
+ }
+ options := &advancedtls.ClientOptions{
+ IdentityOptions: advancedtls.IdentityCertificateOptions{
+ IdentityProvider: clientProvider,
+ },
+ VerifyPeer: func(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) {
+ return &advancedtls.VerificationResults{}, nil
+ },
+ RootOptions: advancedtls.RootCertificateOptions{
+ RootProvider: clientRootProvider,
+ },
+ VType: advancedtls.CertVerification,
+ }
+ ta, err := advancedtls.NewClientCreds(options)
+ if err != nil {
+ glog.Warningf("advancedtls.NewClientCreds(%v) failed: %v", options, err)
return grpc.WithInsecure()
}
- caCertPool := x509.NewCertPool()
- caCertPool.AppendCertsFromPEM(caCert)
-
- ta := credentials.NewTLS(&tls.Config{
- Certificates: []tls.Certificate{cert},
- RootCAs: caCertPool,
- InsecureSkipVerify: true,
- })
return grpc.WithTransportCredentials(ta)
}
@@ -116,27 +151,14 @@ func LoadClientTLSHTTP(clientCertFile string) *tls.Config {
}
}
-func (a Authenticator) Authenticate(ctx context.Context) (newCtx context.Context, err error) {
- p, ok := peer.FromContext(ctx)
- if !ok {
- return ctx, status.Error(codes.Unauthenticated, "no peer found")
+func (a Authenticator) Authenticate(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) {
+ if a.AllowedWildcardDomain != "" && strings.HasSuffix(params.Leaf.Subject.CommonName, a.AllowedWildcardDomain) {
+ return &advancedtls.VerificationResults{}, nil
}
-
- tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
- if !ok {
- return ctx, status.Error(codes.Unauthenticated, "unexpected peer transport credentials")
- }
- if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
- return ctx, status.Error(codes.Unauthenticated, "could not verify peer certificate")
- }
-
- commonName := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
- if a.AllowedWildcardDomain != "" && strings.HasSuffix(commonName, a.AllowedWildcardDomain) {
- return ctx, nil
- }
- if _, ok := a.AllowedCommonNames[commonName]; ok {
- return ctx, nil
+ if _, ok := a.AllowedCommonNames[params.Leaf.Subject.CommonName]; ok {
+ return &advancedtls.VerificationResults{}, nil
}
-
- return ctx, status.Errorf(codes.Unauthenticated, "invalid subject common name: %s", commonName)
+ err := fmt.Errorf("Authenticate: invalid subject client common name: %s", params.Leaf.Subject.CommonName)
+ glog.Error(err)
+ return nil, err
}
diff --git a/weed/sequence/snowflake_sequencer_test.go b/weed/sequence/snowflake_sequencer_test.go
new file mode 100644
index 000000000..731e330c5
--- /dev/null
+++ b/weed/sequence/snowflake_sequencer_test.go
@@ -0,0 +1,25 @@
+package sequence
+
+import (
+ "encoding/hex"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestSequencer(t *testing.T) {
+ seq, err := NewSnowflakeSequencer("for_test", 1)
+ assert.Equal(t, nil, err)
+ last := uint64(0)
+ bytes := make([]byte, types.NeedleIdSize)
+ for i := 0; i < 100; i++ {
+ next := seq.NextFileId(1)
+ types.NeedleIdToBytes(bytes, types.NeedleId(next))
+ println(hex.EncodeToString(bytes))
+ if last == next {
+ t.Errorf("last %d next %d", last, next)
+ }
+ last = next
+ }
+
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 4d0fbbc41..4f5455cb1 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -263,8 +263,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
ms.deleteClient(clientName)
}()
-
- for _, message := range ms.Topo.ToVolumeLocations() {
+ for i, message := range ms.Topo.ToVolumeLocations() {
+ if i == 0 {
+ if leader, err := ms.Topo.Leader(); err == nil {
+ message.Leader = string(leader)
+ }
+ }
if sendErr := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); sendErr != nil {
return sendErr
}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index bc92dd332..9da947869 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -52,8 +52,13 @@ func (ms *MasterServer) ProcessGrowRequest() {
go func() {
glog.V(1).Infoln("starting automatic volume grow")
start := time.Now()
- _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
+ newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
+ if err == nil {
+ for _, newVidLocation := range newVidLocations {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
+ }
+ }
vl.DoneGrowRequest()
if req.ErrCh != nil {
@@ -204,8 +209,9 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
+ totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
- TotalSize: stats.TotalSize,
+ TotalSize: uint64(totalSize),
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index ade750ccc..47abfb892 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand"
"net/http"
"strconv"
@@ -81,7 +82,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
- count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
+ var newVidLocations []*master_pb.VolumeLocation
+ newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
+ count = len(newVidLocations)
}
} else {
err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index cc6578bf5..9971eaa48 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -121,7 +121,10 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
if option.RaftBootstrap {
os.RemoveAll(path.Join(s.dataDir, ldbFile))
os.RemoveAll(path.Join(s.dataDir, sdbFile))
- os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ os.RemoveAll(path.Join(s.dataDir, "snapshots"))
+ }
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshots"), os.ModePerm); err != nil {
+ return nil, err
}
baseDir := s.dataDir
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 8c372f0cc..ad0a1c8ce 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -125,7 +125,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil {
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil {
return nil, err
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index e3ec5b724..b4bc850e2 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,6 +3,8 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"io"
"math"
"os"
@@ -78,6 +80,28 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
}()
+ var preallocateSize int64
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
+ }
+ if resp.VolumePreallocate {
+ preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20)
+ }
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
+ }
+
+ if preallocateSize > 0 {
+ volumeFile := dataBaseFileName + ".dat"
+ _, err := backend.CreateVolumeFile(volumeFile, preallocateSize, 0)
+ if err != nil {
+ return fmt.Errorf("create volume file %s: %v", volumeFile, err)
+ }
+ }
+
// println("source:", volFileInfoResp.String())
copyResponse := &volume_server_pb.VolumeCopyResponse{}
reportInterval := int64(1024 * 1024 * 128)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 265dea03a..5140af2b4 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -48,6 +48,13 @@ type WebDavServer struct {
Handler *webdav.Handler
}
+func max(x, y int64) int64 {
+ if x <= y {
+ return y
+ }
+ return x
+}
+
func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
fs, _ := NewWebDavFileSystem(option)
@@ -496,6 +503,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
written, err := f.bufWriter.Write(buf)
if err == nil {
+ f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
f.off += int64(written)
}
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index c892c3443..e15090190 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -108,16 +108,6 @@ func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, write
remoteConfigureCommand.StringVar(&conf.StorjSecretKey, "storj.secret_key", "", "Storj secret key")
remoteConfigureCommand.StringVar(&conf.StorjEndpoint, "storj.endpoint", "", "Storj endpoint")
- var namenodes arrayFlags
- remoteConfigureCommand.Var(&namenodes, "hdfs.namenodes", "hdfs name node and port, example: namenode1:8020,namenode2:8020")
- remoteConfigureCommand.StringVar(&conf.HdfsUsername, "hdfs.username", "", "hdfs user name")
- remoteConfigureCommand.StringVar(&conf.HdfsServicePrincipalName, "hdfs.servicePrincipalName", "", `Kerberos service principal name for the namenode
-
-Example: hdfs/namenode.hadoop.docker
-Namenode running as service 'hdfs' with FQDN 'namenode.hadoop.docker'.
-`)
- remoteConfigureCommand.StringVar(&conf.HdfsDataTransferProtection, "hdfs.dataTransferProtection", "", "[authentication|integrity|privacy] Kerberos data transfer protection")
-
if err = remoteConfigureCommand.Parse(args); err != nil {
return nil
}
@@ -223,14 +213,3 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write
return nil
}
-
-type arrayFlags []string
-
-func (i *arrayFlags) String() string {
- return "my string representation"
-}
-
-func (i *arrayFlags) Set(value string) error {
- *i = append(*i, value)
- return nil
-}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 289fd3b47..847324838 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -44,13 +44,11 @@ func GenerateDirUuid(dir string) (dirUuidString string, err error) {
dirUuidString = dirUuid.String()
writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644)
if writeErr != nil {
- glog.Warningf("failed to write uuid to %s : %v", fileName, writeErr)
return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr)
}
} else {
uuidData, readErr := os.ReadFile(fileName)
if readErr != nil {
- glog.Warningf("failed to read uuid from %s : %v", fileName, readErr)
return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr)
}
dirUuidString = string(uuidData)
@@ -65,7 +63,10 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSp
} else {
idxDir = util.ResolvePath(idxDir)
}
- dirUuid, _ := GenerateDirUuid(dir)
+ dirUuid, err := GenerateDirUuid(dir)
+ if err != nil {
+ glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err)
+ }
location := &DiskLocation{
Directory: dir,
DirectoryUuid: dirUuid,
diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go
index 199cb26b3..afe12ee72 100644
--- a/weed/storage/needle_map/compact_map_test.go
+++ b/weed/storage/needle_map/compact_map_test.go
@@ -2,10 +2,25 @@ package needle_map
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/sequence"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"testing"
)
+func TestSnowflakeSequencer(t *testing.T) {
+ m := NewCompactMap()
+ seq, _ := sequence.NewSnowflakeSequencer("for_test", 1)
+
+ for i := 0; i < 200000; i++ {
+ id := seq.NextFileId(1)
+ oldOffset, oldSize := m.Set(NeedleId(id), ToOffset(8), 3000073)
+ if oldSize != 0 {
+ t.Errorf("id %d oldOffset %v oldSize %d", id, oldOffset, oldSize)
+ }
+ }
+
+}
+
func TestOverflow2(t *testing.T) {
m := NewCompactMap()
_, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073)
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 147220f4a..e53aa2853 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -22,7 +22,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
errCount := int32(0)
for index, dn := range locationlist.list {
go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
- err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: uint32(vid),
})
@@ -123,7 +123,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
isReadOnly := false
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
@@ -150,7 +150,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: uint32(vid),
})
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
index 7886c3998..238ca99f4 100644
--- a/weed/topology/volume_growth.go
+++ b/weed/topology/volume_growth.go
@@ -3,6 +3,7 @@ package topology
import (
"encoding/json"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand"
"sync"
@@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) {
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) {
if targetCount == 0 {
targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
}
- count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
- if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
- return count, nil
+ result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
+ if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
+ return result, nil
}
- return count, err
+ return result, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
- counter += c
+ if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
+ result = append(result, res...)
} else {
- glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e)
- return counter, e
+ glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
+ return result, e
}
}
return
}
-func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) {
+func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
- return 0, e
+ return nil, e
}
vid, raftErr := topo.NextVolumeId()
if raftErr != nil {
- return 0, raftErr
+ return nil, raftErr
}
- err := vg.grow(grpcDialOption, topo, vid, option, servers...)
- return len(servers), err
+ if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil {
+ for _, server := range servers {
+ result = append(result, &master_pb.VolumeLocation{
+ Url: server.Url(),
+ PublicUrl: server.PublicUrl,
+ NewVids: []uint32{uint32(vid)},
+ })
+ }
+ }
+ return
}
// 1. find the main data node
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 167aee8ea..dee82762a 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -493,9 +493,9 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
for vid, vll := range vl.vid2location {
size, fileCount := vll.Stats(vid, freshThreshold)
ret.FileCount += uint64(fileCount)
- ret.UsedSize += size
+ ret.UsedSize += size * uint64(vll.Length())
if vl.readonlyVolumes.IsTrue(vid) {
- ret.TotalSize += size
+ ret.TotalSize += size * uint64(vll.Length())
} else {
ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length())
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index c0fea8b17..ce8757ce9 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION_NUMBER = fmt.Sprintf("%.02f", 3.11)
+ VERSION_NUMBER = fmt.Sprintf("%.02f", 3.13)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 244a3921a..35f1c4cf8 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -38,6 +38,39 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy
}
}
+func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return mc.LookupFileIdWithFallback
+}
+
+func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
+ fullUrls, err = mc.vidMap.LookupFileId(fileId)
+ if err == nil {
+ return
+ }
+ err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: []string{fileId},
+ })
+ if err != nil {
+ return err
+ }
+ for vid, vidLocation := range resp.VolumeIdLocations {
+ for _, vidLoc := range vidLocation.Locations {
+ loc := Location{
+ Url: vidLoc.Url,
+ PublicUrl: vidLoc.PublicUrl,
+ GrpcPort: int(vidLoc.GrpcPort),
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId)
+ }
+ }
+
+ return nil
+ })
+ return
+}
+
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
return mc.currentMaster
@@ -98,7 +131,6 @@ func (mc *MasterClient) tryAllMasters() {
}
mc.currentMaster = ""
- mc.vidMap = newVidMap("")
}
}
@@ -126,9 +158,25 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
return err
}
-
glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
+
+ resp, err := stream.Recv()
+ if err != nil {
+ glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
+ return err
+ }
+
+ // check if it is the leader to determine whether to reset the vidMap
+ if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
+ nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
+ return nil
+ }
+
mc.currentMaster = master
+ mc.vidMap = newVidMap("")
for {
resp, err := stream.Recv()
@@ -140,8 +188,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" {
- glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
return nil
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index f7a9a0f1a..754c77051 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -90,10 +90,6 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return
}
-func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType {
- return vc.LookupFileId
-}
-
func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {