diff options
Diffstat (limited to 'weed')
42 files changed, 850 insertions, 450 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 6c24df44c..ad6e6b879 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -46,8 +46,6 @@ func NewCluster() *Cluster { } func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers { - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() filers, found := cluster.filerGroup2filers[filerGroup] if !found && createIfNotFound { filers = &Filers{ @@ -63,6 +61,8 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd filerGroup := FilerGroup(ns) switch nodeType { case FilerType: + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() filers := cluster.getFilers(filerGroup, true) if existingNode, found := filers.filers[address]; found { existingNode.counter++ @@ -115,6 +115,8 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb filerGroup := FilerGroup(ns) switch nodeType { case FilerType: + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() filers := cluster.getFilers(filerGroup, false) if filers == nil { return nil @@ -165,12 +167,12 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) { switch nodeType { case FilerType: + cluster.filersLock.RLock() + defer cluster.filersLock.RUnlock() filers := cluster.getFilers(filerGroup, false) if filers == nil { return } - cluster.filersLock.RLock() - defer cluster.filersLock.RUnlock() for _, node := range filers.filers { nodes = append(nodes, node) } diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go index ccaccf6f7..1187642de 100644 --- a/weed/cluster/cluster_test.go +++ b/weed/cluster/cluster_test.go @@ -3,6 +3,8 @@ package cluster import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/stretchr/testify/assert" + "strconv" + "sync" "testing" ) @@ -45,3 +47,35 @@ func TestClusterAddRemoveNodes(t *testing.T) { c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1")) } + +func TestConcurrentAddRemoveNodes(t *testing.T) { + c := NewCluster() + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + address := strconv.Itoa(i) + c.AddClusterNode("", "filer", pb.ServerAddress(address), "23.45") + }(i) + } + wg.Wait() + + for i := 0; i < 50; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + address := strconv.Itoa(i) + node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address)) + + if len(node) == 0 { + t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address) + return + } else if node[0].ClusterNodeUpdate.Address != address { + t.Errorf("TestConcurrentAddRemoveNodes: expect:%s, actual:%s", address, node[0].ClusterNodeUpdate.Address) + return + } + }(i) + } + wg.Wait() +} diff --git a/weed/command/filer.go b/weed/command/filer.go index c9f9a1956..7e0e92d4a 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -6,10 +6,13 @@ import ( "net/http" "os" "runtime" + "sort" + "strings" "time" "google.golang.org/grpc/reflection" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -114,10 +117,8 @@ func init() { filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port") } -var cmdFiler = &Command{ - UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*", - Short: "start a file server that points to a master server, or a list of master servers", - Long: `start a file server which accepts REST operation for any files. +func filerLongDesc() string { + desc := `start a file server which accepts REST operation for any files. //create or overwrite the file, the directories /path/to will be automatically created POST /path/to/file @@ -133,7 +134,22 @@ var cmdFiler = &Command{ The example filer.toml configuration file can be generated by "weed scaffold -config=filer" -`, +Supported Filer Stores: +` + + storeNames := make([]string, len(filer.Stores)) + for i, store := range filer.Stores { + storeNames[i] = "\t" + store.GetName() + } + sort.Strings(storeNames) + storeList := strings.Join(storeNames, "\n") + return desc + storeList +} + +var cmdFiler = &Command{ + UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*", + Short: "start a file server that points to a master server, or a list of master servers", + Long: filerLongDesc(), } func runFiler(cmd *Command, args []string) bool { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index b7da1baf9..1550d155a 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -215,10 +215,10 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, return persistEventFn(resp) } - var lastLogTsNs = time.Now().Nanosecond() + var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { - now := time.Now().Nanosecond() + now := time.Now().UnixNano() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now // collect synchronous offset diff --git a/weed/command/imports.go b/weed/command/imports.go index 04079b162..afdbc5a10 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -5,7 +5,6 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure" _ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs" _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" @@ -32,5 +31,6 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + _ "github.com/chrislusf/seaweedfs/weed/filer/tikv" _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" ) diff --git a/weed/command/master.go b/weed/command/master.go index 9587df055..ab8466d47 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -1,9 +1,11 @@ package command import ( + "fmt" "golang.org/x/exp/slices" "net/http" "os" + "path" "strings" "time" @@ -151,11 +153,12 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } // start raftServer + metaDir := path.Join(*masterOption.metaFolder, fmt.Sprintf("m%d", *masterOption.port)) raftServerOption := &weed_server.RaftServerOption{ GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"), Peers: masterPeers, ServerAddr: myMasterAddress, - DataDir: util.ResolvePath(*masterOption.metaFolder), + DataDir: util.ResolvePath(metaDir), Topo: ms.Topo, RaftResumeState: *masterOption.raftResumeState, HeartbeatInterval: *masterOption.heartbeatInterval, diff --git a/weed/command/mount.go b/weed/command/mount.go index 0e32a53e8..0046ca03d 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -86,7 +86,7 @@ var cmdMount = &Command{ This uses github.com/seaweedfs/fuse, which enables writing FUSE file systems on Linux, and OS X. - On OS X, it requires OSXFUSE (http://osxfuse.github.com/). + On OS X, it requires OSXFUSE (https://osxfuse.github.io/). `, } diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 595fb2e62..c82de8da0 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -327,3 +327,13 @@ location = "/tmp/" address = "localhost:6379" password = "" database = 1 + +[tikv] +enabled = false +# If you have many pd address, use ',' split then: +# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379" +pdaddrs = "localhost:2379" +# Concurrency for TiKV delete range +deleterange_concurrency = 1 +# Enable 1PC +enable_1pc = false diff --git a/weed/command/update_full.go b/weed/command/update_full.go index 529f38219..185203aee 100644 --- a/weed/command/update_full.go +++ b/weed/command/update_full.go @@ -1,5 +1,5 @@ -//go:build elastic && ydb && gocdk && hdfs -// +build elastic,ydb,gocdk,hdfs +//go:build elastic && ydb && gocdk && tikv +// +build elastic,ydb,gocdk,tikv package command diff --git a/weed/filer/tikv/tikv.go b/weed/filer/tikv/tikv.go new file mode 100644 index 000000000..ba1da27a8 --- /dev/null +++ b/weed/filer/tikv/tikv.go @@ -0,0 +1,6 @@ +/* + * Package tikv is for TiKV filer store. + * This empty file is let go build can work without tikv tag + * Using "make full_install" to enable TiKV filer store. + */ +package tikv diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go new file mode 100644 index 000000000..f8932663d --- /dev/null +++ b/weed/filer/tikv/tikv_store.go @@ -0,0 +1,396 @@ +//go:build tikv +// +build tikv + +package tikv + +import ( + "bytes" + "context" + "crypto/sha1" + "fmt" + "io" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/tikv/client-go/v2/txnkv" +) + +var ( + _ filer.FilerStore = ((*TikvStore)(nil)) +) + +func init() { + filer.Stores = append(filer.Stores, &TikvStore{}) +} + +type TikvStore struct { + client *txnkv.Client + deleteRangeConcurrency int + onePC bool +} + +// Basic APIs +func (store *TikvStore) GetName() string { + return "tikv" +} + +func (store *TikvStore) Initialize(config util.Configuration, prefix string) error { + pdAddrs := []string{} + pdAddrsStr := config.GetString(prefix + "pdaddrs") + for _, item := range strings.Split(pdAddrsStr, ",") { + pdAddrs = append(pdAddrs, strings.TrimSpace(item)) + } + drc := config.GetInt(prefix + "deleterange_concurrency") + if drc <= 0 { + drc = 1 + } + store.onePC = config.GetBool(prefix + "enable_1pc") + store.deleteRangeConcurrency = drc + return store.initialize(pdAddrs) +} + +func (store *TikvStore) initialize(pdAddrs []string) error { + client, err := txnkv.NewClient(pdAddrs) + store.client = client + return err +} + +func (store *TikvStore) Shutdown() { + err := store.client.Close() + if err != nil { + glog.V(0).Infof("Shutdown TiKV client got error: %v", err) + } +} + +// ~ Basic APIs + +// Entry APIs +func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { + dir, name := entry.DirAndName() + key := generateKey(dir, name) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Set(key, value) + }) + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + return nil +} + +func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + return store.InsertEntry(ctx, entry) +} + +func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { + dir, name := path.DirAndName() + key := generateKey(dir, name) + + txn, err := store.getTxn(ctx) + if err != nil { + return nil, err + } + var value []byte = nil + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + val, err := txn.Get(context.TODO(), key) + if err == nil { + value = val + } + return err + }) + + if isNotExists(err) || value == nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", path, err) + } + + entry := &filer.Entry{ + FullPath: path, + } + err = entry.DecodeAttributesAndChunks(value) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil +} + +func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { + dir, name := path.DirAndName() + key := generateKey(dir, name) + + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Delete(key) + }) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + return nil +} + +// ~ Entry APIs + +// Directory APIs +func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { + directoryPrefix := genDirectoryKeyPrefix(path, "") + + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + var ( + startKey []byte = nil + endKey []byte = nil + ) + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + iter, err := txn.Iter(directoryPrefix, nil) + if err != nil { + return err + } + defer iter.Close() + for iter.Valid() { + key := iter.Key() + endKey = key + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + if startKey == nil { + startKey = key + } + + err = iter.Next() + if err != nil { + return err + } + } + // Only one Key matched just delete it. + if startKey != nil && bytes.Equal(startKey, endKey) { + return txn.Delete(startKey) + } + return nil + }) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + + if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) { + // has startKey and endKey and they are not equals, so use delete range + _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + } + return err +} + +func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + lastFileName := "" + directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) + } + + txn, err := store.getTxn(ctx) + if err != nil { + return lastFileName, err + } + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + iter, err := txn.Iter(lastFileStart, nil) + if err != nil { + return err + } + defer iter.Close() + i := int64(0) + first := true + for iter.Valid() { + if first { + first = false + if !includeStartFile { + if iter.Valid() { + // Check first item is lastFileStart + if bytes.Equal(iter.Key(), lastFileStart) { + // Is lastFileStart and not include start file, just + // ignore it. + err = iter.Next() + if err != nil { + return err + } + continue + } + } + } + } + // Check for limitation + if limit > 0 { + i++ + if i > limit { + break + } + } + // Validate key prefix + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + value := iter.Value() + + // Start process + fileName := getNameFromKey(key) + if fileName != "" { + // Got file name, then generate the Entry + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), fileName), + } + // Update lastFileName + lastFileName = fileName + // Check for decode value. + if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { + // Got error just return the error + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + return err + } + // Run for each callback if return false just break the iteration + if !eachEntryFunc(entry) { + break + } + } + // End process + + err = iter.Next() + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) + } + return lastFileName, nil +} + +// ~ Directory APIs + +// Transaction Related APIs +func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.client.Begin() + if err != nil { + return ctx, err + } + if store.onePC { + tx.SetEnable1PC(store.onePC) + } + return context.WithValue(ctx, "tx", tx), nil +} + +func (store *TikvStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return tx.Commit(context.Background()) + } + return nil +} + +func (store *TikvStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return tx.Rollback() + } + return nil +} + +// ~ Transaction Related APIs + +// Transaction Wrapper +type TxnWrapper struct { + *txnkv.KVTxn + inContext bool +} + +func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { + err := f(w.KVTxn) + if !w.inContext { + if err != nil { + w.KVTxn.Rollback() + return err + } + w.KVTxn.Commit(context.Background()) + return nil + } + return err +} + +func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return &TxnWrapper{tx, true}, nil + } + txn, err := store.client.Begin() + if err != nil { + return nil, err + } + if store.onePC { + txn.SetEnable1PC(store.onePC) + } + return &TxnWrapper{txn, false}, nil +} + +// ~ Transaction Wrapper + +// Encoding Functions +func hashToBytes(dir string) []byte { + h := sha1.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return b +} + +func generateKey(dirPath, fileName string) []byte { + key := hashToBytes(dirPath) + key = append(key, []byte(fileName)...) + return key +} + +func getNameFromKey(key []byte) string { + return string(key[sha1.Size:]) +} + +func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = hashToBytes(string(fullpath)) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func isNotExists(err error) bool { + if err == nil { + return false + } + if err.Error() == "not exist" { + return true + } + return false +} + +// ~ Encoding Functions diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go new file mode 100644 index 000000000..1d9428c69 --- /dev/null +++ b/weed/filer/tikv/tikv_store_kv.go @@ -0,0 +1,50 @@ +//go:build tikv +// +build tikv + +package tikv + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/tikv/client-go/v2/txnkv" +) + +func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error { + tw, err := store.getTxn(ctx) + if err != nil { + return err + } + return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Set(key, value) + }) +} + +func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { + tw, err := store.getTxn(ctx) + if err != nil { + return nil, err + } + var data []byte = nil + err = tw.RunInTxn(func(txn *txnkv.KVTxn) error { + val, err := txn.Get(context.TODO(), key) + if err == nil { + data = val + } + return err + }) + if isNotExists(err) { + return data, filer.ErrKvNotFound + } + return data, err +} + +func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error { + tw, err := store.getTxn(ctx) + if err != nil { + return err + } + return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Delete(key) + }) +} diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go index e465158e8..fa17a9261 100644 --- a/weed/mount/inode_to_path.go +++ b/weed/mount/inode_to_path.go @@ -157,6 +157,10 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (replacedIn defer i.Unlock() sourceInode, sourceFound := i.path2inode[sourcePath] targetInode, targetFound := i.path2inode[targetPath] + if targetFound { + delete(i.inode2path, targetInode) + delete(i.path2inode, targetPath) + } if sourceFound { delete(i.path2inode, sourcePath) i.path2inode[targetPath] = sourceInode @@ -165,11 +169,14 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) (replacedIn // so no need to worry about their source inodes return } - i.inode2path[sourceInode].FullPath = targetPath - if targetFound { - delete(i.inode2path, targetInode) + if entry, entryFound := i.inode2path[sourceInode]; entryFound { + entry.FullPath = targetPath + entry.isChildrenCached = false + if !targetFound { + entry.nlookup++ + } } else { - i.inode2path[sourceInode].nlookup++ + glog.Errorf("MovePath %s to %s: sourceInode %d not found", sourcePath, targetPath, sourceInode) } return targetInode } diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go index 289a0bc2c..4246f0a4c 100644 --- a/weed/mount/weedfs_dir_mkrm.go +++ b/weed/mount/weedfs_dir_mkrm.go @@ -104,7 +104,7 @@ func (wfs *WFS) Rmdir(cancel <-chan struct{}, header *fuse.InHeader, name string glog.V(3).Infof("remove directory: %v", entryFullPath) ignoreRecursiveErr := true // ignore recursion error since the OS should manage it - err := filer_pb.Remove(wfs, string(dirFullPath), name, true, true, ignoreRecursiveErr, false, []int32{wfs.signature}) + err := filer_pb.Remove(wfs, string(dirFullPath), name, true, false, ignoreRecursiveErr, false, []int32{wfs.signature}) if err != nil { glog.V(0).Infof("remove %s: %v", entryFullPath, err) if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { diff --git a/weed/pb/filer_pb/signature.go b/weed/pb/filer_pb/signature.go deleted file mode 100644 index e13afc656..000000000 --- a/weed/pb/filer_pb/signature.go +++ /dev/null @@ -1,13 +0,0 @@ -package filer_pb - -func (r *CreateEntryRequest) AddSignature(sig int32) { - r.Signatures = append(r.Signatures, sig) -} -func (r *CreateEntryRequest) HasSigned(sig int32) bool { - for _, s := range r.Signatures { - if s == sig { - return true - } - } - return false -} diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index d89e61433..990cf74f9 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -46,8 +46,9 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { var options []grpc.ServerOption options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 10 * time.Second, // wait time before ping if no activity - Timeout: 20 * time.Second, // ping timeout + Time: 10 * time.Second, // wait time before ping if no activity + Timeout: 20 * time.Second, // ping timeout + MaxConnectionAge: 10 * time.Hour, }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 60 * time.Second, // min time a client should wait before sending a ping diff --git a/weed/pb/remote.proto b/weed/pb/remote.proto index bdecf4dbe..13f7a878b 100644 --- a/weed/pb/remote.proto +++ b/weed/pb/remote.proto @@ -49,11 +49,6 @@ message RemoteConf { string wasabi_endpoint = 42; string wasabi_region = 43; - repeated string hdfs_namenodes = 50; - string hdfs_username = 51; - string hdfs_service_principal_name = 52; - string hdfs_data_transfer_protection = 53; - string filebase_access_key = 60; string filebase_secret_key = 61; string filebase_endpoint = 62; diff --git a/weed/remote_storage/hdfs/doc.go b/weed/remote_storage/hdfs/doc.go deleted file mode 100644 index 086c9de3f..000000000 --- a/weed/remote_storage/hdfs/doc.go +++ /dev/null @@ -1,9 +0,0 @@ -/* - -Package hdfs is for remote hdfs storage. - -The referenced "github.com/colinmarc/hdfs/v2" library is too big when compiled. -So this is only compiled in "make full_install". - -*/ -package hdfs diff --git a/weed/remote_storage/hdfs/hdfs_kerberos.go b/weed/remote_storage/hdfs/hdfs_kerberos.go deleted file mode 100644 index ba152020a..000000000 --- a/weed/remote_storage/hdfs/hdfs_kerberos.go +++ /dev/null @@ -1,58 +0,0 @@ -//go:build hdfs -// +build hdfs - -package hdfs - -import ( - "fmt" - "os" - "os/user" - "strings" - - krb "github.com/jcmturner/gokrb5/v8/client" - "github.com/jcmturner/gokrb5/v8/config" - "github.com/jcmturner/gokrb5/v8/credentials" -) - -// copy-paste from https://github.com/colinmarc/hdfs/blob/master/cmd/hdfs/kerberos.go -func getKerberosClient() (*krb.Client, error) { - configPath := os.Getenv("KRB5_CONFIG") - if configPath == "" { - configPath = "/etc/krb5.conf" - } - - cfg, err := config.Load(configPath) - if err != nil { - return nil, err - } - - // Determine the ccache location from the environment, falling back to the - // default location. - ccachePath := os.Getenv("KRB5CCNAME") - if strings.Contains(ccachePath, ":") { - if strings.HasPrefix(ccachePath, "FILE:") { - ccachePath = strings.SplitN(ccachePath, ":", 2)[1] - } else { - return nil, fmt.Errorf("unusable ccache: %s", ccachePath) - } - } else if ccachePath == "" { - u, err := user.Current() - if err != nil { - return nil, err - } - - ccachePath = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid) - } - - ccache, err := credentials.LoadCCache(ccachePath) - if err != nil { - return nil, err - } - - client, err := krb.NewFromCCache(ccache, cfg) - if err != nil { - return nil, err - } - - return client, nil -} diff --git a/weed/remote_storage/hdfs/hdfs_storage_client.go b/weed/remote_storage/hdfs/hdfs_storage_client.go deleted file mode 100644 index 3b71958fd..000000000 --- a/weed/remote_storage/hdfs/hdfs_storage_client.go +++ /dev/null @@ -1,194 +0,0 @@ -//go:build hdfs -// +build hdfs - -package hdfs - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" - "github.com/chrislusf/seaweedfs/weed/remote_storage" - "github.com/chrislusf/seaweedfs/weed/util" - hdfs "github.com/colinmarc/hdfs/v2" - "io" - "os" - "path" -) - -func init() { - remote_storage.RemoteStorageClientMakers["hdfs"] = new(hdfsRemoteStorageMaker) -} - -type hdfsRemoteStorageMaker struct{} - -func (s hdfsRemoteStorageMaker) HasBucket() bool { - return false -} - -func (s hdfsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { - client := &hdfsRemoteStorageClient{ - conf: conf, - } - - options := hdfs.ClientOptions{ - Addresses: conf.HdfsNamenodes, - UseDatanodeHostname: false, - } - - if conf.HdfsServicePrincipalName != "" { - var err error - options.KerberosClient, err = getKerberosClient() - if err != nil { - return nil, fmt.Errorf("get kerberos authentication: %s", err) - } - options.KerberosServicePrincipleName = conf.HdfsServicePrincipalName - - if conf.HdfsDataTransferProtection != "" { - options.DataTransferProtection = conf.HdfsDataTransferProtection - } - } else { - options.User = conf.HdfsUsername - } - - c, err := hdfs.NewClient(options) - if err != nil { - return nil, err - } - - client.client = c - return client, nil -} - -type hdfsRemoteStorageClient struct { - conf *remote_pb.RemoteConf - client *hdfs.Client -} - -var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{}) - -func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { - - return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error { - children, err := c.client.ReadDir(string(parentDir)) - if err != nil { - return err - } - for _, child := range children { - if err := visitFn(string(parentDir), child.Name(), child.IsDir(), &filer_pb.RemoteEntry{ - StorageName: c.conf.Name, - LastLocalSyncTsNs: 0, - RemoteETag: "", - RemoteMtime: child.ModTime().Unix(), - RemoteSize: child.Size(), - }); err != nil { - return nil - } - } - return nil - }, util.FullPath(loc.Path), visitFn) - -} -func (c *hdfsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { - - f, err := c.client.Open(loc.Path) - if err != nil { - return - } - defer f.Close() - data = make([]byte, size) - _, err = f.ReadAt(data, offset) - - return - -} - -func (c *hdfsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { - return c.client.MkdirAll(loc.Path, os.FileMode(entry.Attributes.FileMode)) -} - -func (c *hdfsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) { - return c.client.RemoveAll(loc.Path) -} - -func (c *hdfsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) { - - dirname := path.Dir(loc.Path) - - // ensure parent directory - if err = c.client.MkdirAll(dirname, 0755); err != nil { - return - } - - // remove existing file - info, err := c.client.Stat(loc.Path) - if err == nil { - err = c.client.Remove(loc.Path) - if err != nil { - return - } - } - - // create new file - out, err := c.client.Create(loc.Path) - if err != nil { - return - } - - cleanup := func() { - if removeErr := c.client.Remove(loc.Path); removeErr != nil { - glog.Errorf("clean up %s%s: %v", loc.Name, loc.Path, removeErr) - } - } - - if _, err = io.Copy(out, reader); err != nil { - cleanup() - return - } - - if err = out.Close(); err != nil { - cleanup() - return - } - - info, err = c.client.Stat(loc.Path) - if err != nil { - return - } - - return &filer_pb.RemoteEntry{ - RemoteMtime: info.ModTime().Unix(), - RemoteSize: info.Size(), - RemoteETag: "", - StorageName: c.conf.Name, - }, nil - -} - -func (c *hdfsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error { - if oldEntry.Attributes.FileMode != newEntry.Attributes.FileMode { - if err := c.client.Chmod(loc.Path, os.FileMode(newEntry.Attributes.FileMode)); err != nil { - return err - } - } - return nil -} - -func (c *hdfsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) { - if err = c.client.Remove(loc.Path); err != nil { - return fmt.Errorf("hdfs delete %s: %v", loc.Path, err) - } - return -} - -func (c *hdfsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) { - return -} - -func (c *hdfsRemoteStorageClient) CreateBucket(name string) (err error) { - return -} - -func (c *hdfsRemoteStorageClient) DeleteBucket(name string) (err error) { - return -} diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 7c8311d21..68fb0a5d2 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -16,7 +16,7 @@ import ( ) type CircuitBreaker struct { - sync.Mutex + sync.RWMutex Enabled bool counters map[string]*int64 limitations map[string]int64 @@ -37,7 +37,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { }) if err != nil { - glog.Warningf("fail to load config: %v", err) + glog.Infof("s3 circuit breaker not configured: %v", err) } return cb @@ -110,7 +110,7 @@ func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) { //bucket simultaneous request count - bucketCountRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest) + bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) if bucketCountRollBack != nil { rollback = append(rollback, bucketCountRollBack) } @@ -119,7 +119,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( } //bucket simultaneous request content bytes - bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(bucket, action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed) + bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) if bucketContentLengthRollBack != nil { rollback = append(rollback, bucketContentLengthRollBack) } @@ -128,7 +128,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( } //global simultaneous request count - globalCountRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeCount, 1, s3err.ErrTooManyRequest) + globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest) if globalCountRollBack != nil { rollback = append(rollback, globalCountRollBack) } @@ -137,7 +137,7 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( } //global simultaneous request content bytes - globalContentLengthRollBack, errCode := cb.loadCounterAndCompare("", action, s3_constants.LimitTypeBytes, r.ContentLength, s3err.ErrRequestBytesExceed) + globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed) if globalContentLengthRollBack != nil { rollback = append(rollback, globalContentLengthRollBack) } @@ -147,11 +147,13 @@ func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) ( return } -func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { - key := s3_constants.Concat(bucket, action, limitType) +func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) { e = s3err.ErrNone if max, ok := cb.limitations[key]; ok { + cb.RLock() counter, exists := cb.counters[key] + cb.RUnlock() + if !exists { cb.Lock() counter, exists = cb.counters[key] @@ -171,7 +173,6 @@ func (cb *CircuitBreaker) loadCounterAndCompare(bucket, action, limitType string f = func() { atomic.AddInt64(counter, -inc) } - current = atomic.LoadInt64(counter) if current > max { e = errCode return diff --git a/weed/s3api/s3api_circuit_breaker_test.go b/weed/s3api/s3api_circuit_breaker_test.go index f795b75fc..5848cf164 100644 --- a/weed/s3api/s3api_circuit_breaker_test.go +++ b/weed/s3api/s3api_circuit_breaker_test.go @@ -11,28 +11,38 @@ import ( ) type TestLimitCase struct { - actionName string + actionName string + limitType string bucketLimitValue int64 globalLimitValue int64 routineCount int - reqBytes int64 - successCount int64 } var ( bucket = "/test" - action = s3_constants.ACTION_READ + action = s3_constants.ACTION_WRITE + fileSize int64 = 200 + TestLimitCases = []*TestLimitCase{ - {action, s3_constants.LimitTypeCount, 5, 5, 6, 1024, 5}, - {action, s3_constants.LimitTypeCount, 6, 6, 6, 1024, 6}, - {action, s3_constants.LimitTypeCount, 5, 6, 6, 1024, 5}, - {action, s3_constants.LimitTypeBytes, 1024, 1024, 6, 200, 5}, - {action, s3_constants.LimitTypeBytes, 1200, 1200, 6, 200, 6}, - {action, s3_constants.LimitTypeBytes, 11990, 11990, 60, 200, 59}, - {action, s3_constants.LimitTypeBytes, 11790, 11990, 70, 200, 58}, + + //bucket-LimitTypeCount + {action, s3_constants.LimitTypeCount, 5, 6, 60, 5}, + {action, s3_constants.LimitTypeCount, 0, 6, 6, 0}, + + //global-LimitTypeCount + {action, s3_constants.LimitTypeCount, 6, 5, 6, 5}, + {action, s3_constants.LimitTypeCount, 6, 0, 6, 0}, + + //bucket-LimitTypeBytes + {action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5}, + {action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0}, + + //global-LimitTypeBytes + {action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5}, + {action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0}, } ) @@ -64,14 +74,14 @@ func TestLimit(t *testing.T) { t.Fatal(err) } - successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: tc.reqBytes}) + successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName) if successCount != tc.successCount { - t.Errorf("successCount not equal, expect=%d, actual=%d", tc.successCount, successCount) + t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc) } } } -func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request) int64 { +func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 { var successCounter int64 resultCh := make(chan []func(), routineCount) var wg sync.WaitGroup diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go index 370b5a911..950e7a8fb 100644 --- a/weed/s3api/s3api_object_copy_handlers.go +++ b/weed/s3api/s3api_object_copy_handlers.go @@ -80,8 +80,8 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request return } - dstUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s", - s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject), dstBucket) + dstUrl := fmt.Sprintf("http://%s%s/%s%s", + s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject)) srcUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject)) @@ -169,8 +169,8 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req rangeHeader := r.Header.Get("x-amz-copy-source-range") - dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket) + dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part", + s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID) srcUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject)) diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 768f4d180..d2ff87832 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -247,8 +247,8 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID) - uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, bucket) + uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part", + s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID) if partID == 1 && r.Header.Get("Content-Type") == "" { dataReader = mimeDetect(r, dataReader) diff --git a/weed/security/tls.go b/weed/security/tls.go index 79552c026..bfa9d43c7 100644 --- a/weed/security/tls.go +++ b/weed/security/tls.go @@ -1,24 +1,22 @@ package security import ( - "context" "crypto/tls" "crypto/x509" + "fmt" + "google.golang.org/grpc/credentials/tls/certprovider/pemfile" + "google.golang.org/grpc/security/advancedtls" "io/ioutil" - "os" "strings" - - grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" ) +const credRefreshingInterval = time.Duration(5) * time.Hour + type Authenticator struct { AllowedWildcardDomain string AllowedCommonNames map[string]bool @@ -29,28 +27,39 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption return nil, nil } - // load cert/key, ca cert - cert, err := tls.LoadX509KeyPair(config.GetString(component+".cert"), config.GetString(component+".key")) + serverOptions := pemfile.Options{ + CertFile: config.GetString(component + ".cert"), + KeyFile: config.GetString(component + ".key"), + RefreshDuration: credRefreshingInterval, + } + + serverIdentityProvider, err := pemfile.NewProvider(serverOptions) if err != nil { - glog.V(1).Infof("load cert: %s / key: %s error: %v", - config.GetString(component+".cert"), - config.GetString(component+".key"), - err) + glog.Warningf("pemfile.NewProvider(%v) %v failed: %v", serverOptions, component, err) return nil, nil } - caCert, err := os.ReadFile(config.GetString("grpc.ca")) + + serverRootOptions := pemfile.Options{ + RootFile: config.GetString("grpc.ca"), + RefreshDuration: credRefreshingInterval, + } + serverRootProvider, err := pemfile.NewProvider(serverRootOptions) if err != nil { - glog.V(1).Infof("read ca cert file %s error: %v", config.GetString("grpc.ca"), err) + glog.Warningf("pemfile.NewProvider(%v) failed: %v", serverRootOptions, err) return nil, nil } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - ta := credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{cert}, - ClientCAs: caCertPool, - ClientAuth: tls.RequireAndVerifyClientCert, - }) + // Start a server and create a client using advancedtls API with Provider. + options := &advancedtls.ServerOptions{ + IdentityOptions: advancedtls.IdentityCertificateOptions{ + IdentityProvider: serverIdentityProvider, + }, + RootOptions: advancedtls.RootCertificateOptions{ + RootProvider: serverRootProvider, + }, + RequireClientCert: true, + VType: advancedtls.CertVerification, + } allowedCommonNames := config.GetString(component + ".allowed_commonNames") allowedWildcardDomain := config.GetString("grpc.allowed_wildcard_domain") if allowedCommonNames != "" || allowedWildcardDomain != "" { @@ -62,7 +71,16 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption AllowedCommonNames: allowedCommonNamesMap, AllowedWildcardDomain: allowedWildcardDomain, } - return grpc.Creds(ta), grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(auther.Authenticate)) + options.VerifyPeer = auther.Authenticate + } else { + options.VerifyPeer = func(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) { + return &advancedtls.VerificationResults{}, nil + } + } + ta, err := advancedtls.NewServerCreds(options) + if err != nil { + glog.Warningf("advancedtls.NewServerCreds(%v) failed: %v", options, err) + return nil, nil } return grpc.Creds(ta), nil } @@ -77,25 +95,42 @@ func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption { return grpc.WithInsecure() } - // load cert/key, cacert - cert, err := tls.LoadX509KeyPair(certFileName, keyFileName) + clientOptions := pemfile.Options{ + CertFile: certFileName, + KeyFile: keyFileName, + RefreshDuration: credRefreshingInterval, + } + clientProvider, err := pemfile.NewProvider(clientOptions) if err != nil { - glog.V(1).Infof("load cert/key error: %v", err) + glog.Warningf("pemfile.NewProvider(%v) failed %v", clientOptions, err) return grpc.WithInsecure() } - caCert, err := os.ReadFile(caFileName) + clientRootOptions := pemfile.Options{ + RootFile: config.GetString("grpc.ca"), + RefreshDuration: credRefreshingInterval, + } + clientRootProvider, err := pemfile.NewProvider(clientRootOptions) if err != nil { - glog.V(1).Infof("read ca cert file error: %v", err) + glog.Warningf("pemfile.NewProvider(%v) failed: %v", clientRootOptions, err) + return grpc.WithInsecure() + } + options := &advancedtls.ClientOptions{ + IdentityOptions: advancedtls.IdentityCertificateOptions{ + IdentityProvider: clientProvider, + }, + VerifyPeer: func(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) { + return &advancedtls.VerificationResults{}, nil + }, + RootOptions: advancedtls.RootCertificateOptions{ + RootProvider: clientRootProvider, + }, + VType: advancedtls.CertVerification, + } + ta, err := advancedtls.NewClientCreds(options) + if err != nil { + glog.Warningf("advancedtls.NewClientCreds(%v) failed: %v", options, err) return grpc.WithInsecure() } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) - - ta := credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - InsecureSkipVerify: true, - }) return grpc.WithTransportCredentials(ta) } @@ -116,27 +151,14 @@ func LoadClientTLSHTTP(clientCertFile string) *tls.Config { } } -func (a Authenticator) Authenticate(ctx context.Context) (newCtx context.Context, err error) { - p, ok := peer.FromContext(ctx) - if !ok { - return ctx, status.Error(codes.Unauthenticated, "no peer found") +func (a Authenticator) Authenticate(params *advancedtls.VerificationFuncParams) (*advancedtls.VerificationResults, error) { + if a.AllowedWildcardDomain != "" && strings.HasSuffix(params.Leaf.Subject.CommonName, a.AllowedWildcardDomain) { + return &advancedtls.VerificationResults{}, nil } - - tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo) - if !ok { - return ctx, status.Error(codes.Unauthenticated, "unexpected peer transport credentials") - } - if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 { - return ctx, status.Error(codes.Unauthenticated, "could not verify peer certificate") - } - - commonName := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName - if a.AllowedWildcardDomain != "" && strings.HasSuffix(commonName, a.AllowedWildcardDomain) { - return ctx, nil - } - if _, ok := a.AllowedCommonNames[commonName]; ok { - return ctx, nil + if _, ok := a.AllowedCommonNames[params.Leaf.Subject.CommonName]; ok { + return &advancedtls.VerificationResults{}, nil } - - return ctx, status.Errorf(codes.Unauthenticated, "invalid subject common name: %s", commonName) + err := fmt.Errorf("Authenticate: invalid subject client common name: %s", params.Leaf.Subject.CommonName) + glog.Error(err) + return nil, err } diff --git a/weed/sequence/snowflake_sequencer_test.go b/weed/sequence/snowflake_sequencer_test.go new file mode 100644 index 000000000..731e330c5 --- /dev/null +++ b/weed/sequence/snowflake_sequencer_test.go @@ -0,0 +1,25 @@ +package sequence + +import ( + "encoding/hex" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSequencer(t *testing.T) { + seq, err := NewSnowflakeSequencer("for_test", 1) + assert.Equal(t, nil, err) + last := uint64(0) + bytes := make([]byte, types.NeedleIdSize) + for i := 0; i < 100; i++ { + next := seq.NextFileId(1) + types.NeedleIdToBytes(bytes, types.NeedleId(next)) + println(hex.EncodeToString(bytes)) + if last == next { + t.Errorf("last %d next %d", last, next) + } + last = next + } + +} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 4d0fbbc41..4f5455cb1 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -263,8 +263,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } ms.deleteClient(clientName) }() - - for _, message := range ms.Topo.ToVolumeLocations() { + for i, message := range ms.Topo.ToVolumeLocations() { + if i == 0 { + if leader, err := ms.Topo.Leader(); err == nil { + message.Leader = string(leader) + } + } if sendErr := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); sendErr != nil { return sendErr } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index bc92dd332..9da947869 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -52,8 +52,13 @@ func (ms *MasterServer) ProcessGrowRequest() { go func() { glog.V(1).Infoln("starting automatic volume grow") start := time.Now() - _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) + newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) + if err == nil { + for _, newVidLocation := range newVidLocations { + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation}) + } + } vl.DoneGrowRequest() if req.ErrCh != nil { @@ -204,8 +209,9 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType)) stats := volumeLayout.Stats() + totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 resp := &master_pb.StatisticsResponse{ - TotalSize: stats.TotalSize, + TotalSize: uint64(totalSize), UsedSize: stats.UsedSize, FileCount: stats.FileCount, } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index ade750ccc..47abfb892 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math/rand" "net/http" "strconv" @@ -81,7 +82,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) { err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount()) } else { - count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) + var newVidLocations []*master_pb.VolumeLocation + newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) + count = len(newVidLocations) } } else { err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count")) diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index cc6578bf5..9971eaa48 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -121,7 +121,10 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { if option.RaftBootstrap { os.RemoveAll(path.Join(s.dataDir, ldbFile)) os.RemoveAll(path.Join(s.dataDir, sdbFile)) - os.RemoveAll(path.Join(s.dataDir, "snapshot")) + os.RemoveAll(path.Join(s.dataDir, "snapshots")) + } + if err := os.MkdirAll(path.Join(s.dataDir, "snapshots"), os.ModePerm); err != nil { + return nil, err } baseDir := s.dataDir diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 8c372f0cc..ad0a1c8ce 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -125,7 +125,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil { + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil { return nil, err } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index e3ec5b724..b4bc850e2 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,6 +3,8 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "io" "math" "os" @@ -78,6 +80,28 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } }() + var preallocateSize int64 + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err) + } + if resp.VolumePreallocate { + preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20) + } + return nil + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) + } + + if preallocateSize > 0 { + volumeFile := dataBaseFileName + ".dat" + _, err := backend.CreateVolumeFile(volumeFile, preallocateSize, 0) + if err != nil { + return fmt.Errorf("create volume file %s: %v", volumeFile, err) + } + } + // println("source:", volFileInfoResp.String()) copyResponse := &volume_server_pb.VolumeCopyResponse{} reportInterval := int64(1024 * 1024 * 128) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 265dea03a..5140af2b4 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -48,6 +48,13 @@ type WebDavServer struct { Handler *webdav.Handler } +func max(x, y int64) int64 { + if x <= y { + return y + } + return x +} + func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { fs, _ := NewWebDavFileSystem(option) @@ -496,6 +503,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { written, err := f.bufWriter.Write(buf) if err == nil { + f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize))) glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf))) f.off += int64(written) } diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index c892c3443..e15090190 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -108,16 +108,6 @@ func (c *commandRemoteConfigure) Do(args []string, commandEnv *CommandEnv, write remoteConfigureCommand.StringVar(&conf.StorjSecretKey, "storj.secret_key", "", "Storj secret key") remoteConfigureCommand.StringVar(&conf.StorjEndpoint, "storj.endpoint", "", "Storj endpoint") - var namenodes arrayFlags - remoteConfigureCommand.Var(&namenodes, "hdfs.namenodes", "hdfs name node and port, example: namenode1:8020,namenode2:8020") - remoteConfigureCommand.StringVar(&conf.HdfsUsername, "hdfs.username", "", "hdfs user name") - remoteConfigureCommand.StringVar(&conf.HdfsServicePrincipalName, "hdfs.servicePrincipalName", "", `Kerberos service principal name for the namenode - -Example: hdfs/namenode.hadoop.docker -Namenode running as service 'hdfs' with FQDN 'namenode.hadoop.docker'. -`) - remoteConfigureCommand.StringVar(&conf.HdfsDataTransferProtection, "hdfs.dataTransferProtection", "", "[authentication|integrity|privacy] Kerberos data transfer protection") - if err = remoteConfigureCommand.Parse(args); err != nil { return nil } @@ -223,14 +213,3 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write return nil } - -type arrayFlags []string - -func (i *arrayFlags) String() string { - return "my string representation" -} - -func (i *arrayFlags) Set(value string) error { - *i = append(*i, value) - return nil -} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 289fd3b47..847324838 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -44,13 +44,11 @@ func GenerateDirUuid(dir string) (dirUuidString string, err error) { dirUuidString = dirUuid.String() writeErr := util.WriteFile(fileName, []byte(dirUuidString), 0644) if writeErr != nil { - glog.Warningf("failed to write uuid to %s : %v", fileName, writeErr) return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, writeErr) } } else { uuidData, readErr := os.ReadFile(fileName) if readErr != nil { - glog.Warningf("failed to read uuid from %s : %v", fileName, readErr) return "", fmt.Errorf("failed to read uuid from %s : %v", fileName, readErr) } dirUuidString = string(uuidData) @@ -65,7 +63,10 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpace util.MinFreeSp } else { idxDir = util.ResolvePath(idxDir) } - dirUuid, _ := GenerateDirUuid(dir) + dirUuid, err := GenerateDirUuid(dir) + if err != nil { + glog.Fatalf("cannot generate uuid of dir %s: %v", dir, err) + } location := &DiskLocation{ Directory: dir, DirectoryUuid: dirUuid, diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go index 199cb26b3..afe12ee72 100644 --- a/weed/storage/needle_map/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -2,10 +2,25 @@ package needle_map import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/sequence" . "github.com/chrislusf/seaweedfs/weed/storage/types" "testing" ) +func TestSnowflakeSequencer(t *testing.T) { + m := NewCompactMap() + seq, _ := sequence.NewSnowflakeSequencer("for_test", 1) + + for i := 0; i < 200000; i++ { + id := seq.NextFileId(1) + oldOffset, oldSize := m.Set(NeedleId(id), ToOffset(8), 3000073) + if oldSize != 0 { + t.Errorf("id %d oldOffset %v oldSize %d", id, oldOffset, oldSize) + } + } + +} + func TestOverflow2(t *testing.T) { m := NewCompactMap() _, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073) diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 147220f4a..e53aa2853 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -22,7 +22,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne errCount := int32(0) for index, dn := range locationlist.list { go func(index int, url pb.ServerAddress, vid needle.VolumeId) { - err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{ VolumeId: uint32(vid), }) @@ -123,7 +123,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V isReadOnly := false for _, dn := range locationlist.list { glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) @@ -150,7 +150,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) { for _, dn := range locationlist.list { glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url()) - err := operation.WithVolumeServerClient(true, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{ VolumeId: uint32(vid), }) diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index 7886c3998..238ca99f4 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -3,6 +3,7 @@ package topology import ( "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math/rand" "sync" @@ -77,42 +78,50 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { return } -func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (count int, err error) { +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount int) (result []*master_pb.VolumeLocation, err error) { if targetCount == 0 { targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()) } - count, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) - if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { - return count, nil + result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo) + if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 { + return result, nil } - return count, err + return result, err } -func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { +func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount int, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) { vg.accessLock.Lock() defer vg.accessLock.Unlock() for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { - counter += c + if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil { + result = append(result, res...) } else { - glog.V(0).Infof("create %d volume, created %d: %v", targetCount, counter, e) - return counter, e + glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e) + return result, e } } return } -func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (int, error) { +func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { servers, e := vg.findEmptySlotsForOneVolume(topo, option) if e != nil { - return 0, e + return nil, e } vid, raftErr := topo.NextVolumeId() if raftErr != nil { - return 0, raftErr + return nil, raftErr } - err := vg.grow(grpcDialOption, topo, vid, option, servers...) - return len(servers), err + if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil { + for _, server := range servers { + result = append(result, &master_pb.VolumeLocation{ + Url: server.Url(), + PublicUrl: server.PublicUrl, + NewVids: []uint32{uint32(vid)}, + }) + } + } + return } // 1. find the main data node diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 167aee8ea..dee82762a 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -493,9 +493,9 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { for vid, vll := range vl.vid2location { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) - ret.UsedSize += size + ret.UsedSize += size * uint64(vll.Length()) if vl.readonlyVolumes.IsTrue(vid) { - ret.TotalSize += size + ret.TotalSize += size * uint64(vll.Length()) } else { ret.TotalSize += vl.volumeSizeLimit * uint64(vll.Length()) } diff --git a/weed/util/constants.go b/weed/util/constants.go index c0fea8b17..ce8757ce9 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION_NUMBER = fmt.Sprintf("%.02f", 3.11) + VERSION_NUMBER = fmt.Sprintf("%.02f", 3.13) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" ) diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 244a3921a..35f1c4cf8 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -38,6 +38,39 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy } } +func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { + return mc.LookupFileIdWithFallback +} + +func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) { + fullUrls, err = mc.vidMap.LookupFileId(fileId) + if err == nil { + return + } + err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: []string{fileId}, + }) + if err != nil { + return err + } + for vid, vidLocation := range resp.VolumeIdLocations { + for _, vidLoc := range vidLocation.Locations { + loc := Location{ + Url: vidLoc.Url, + PublicUrl: vidLoc.PublicUrl, + GrpcPort: int(vidLoc.GrpcPort), + } + mc.vidMap.addLocation(uint32(vid), loc) + fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId) + } + } + + return nil + }) + return +} + func (mc *MasterClient) GetMaster() pb.ServerAddress { mc.WaitUntilConnected() return mc.currentMaster @@ -98,7 +131,6 @@ func (mc *MasterClient) tryAllMasters() { } mc.currentMaster = "" - mc.vidMap = newVidMap("") } } @@ -126,9 +158,25 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc() return err } - glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master) + + resp, err := stream.Recv() + if err != nil { + glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() + return err + } + + // check if it is the leader to determine whether to reset the vidMap + if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader { + glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader) + nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) + stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() + return nil + } + mc.currentMaster = master + mc.vidMap = newVidMap("") for { resp, err := stream.Recv() @@ -140,8 +188,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.VolumeLocation != nil { // maybe the leader is changed - if resp.VolumeLocation.Leader != "" { - glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader) + if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader { + glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() return nil diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index f7a9a0f1a..754c77051 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -90,10 +90,6 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er return } -func (vc *vidMap) GetLookupFileIdFunction() LookupFileIdFunctionType { - return vc.LookupFileId -} - func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { |
