aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/depsreview.yml2
-rw-r--r--go.mod14
-rw-r--r--go.sum30
-rw-r--r--unmaintained/diff_volume_servers/diff_volume_servers.go9
-rw-r--r--unmaintained/see_idx/see_idx.go5
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go2
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go16
-rw-r--r--weed/filer/leveldb3/leveldb3_store_bucket.go23
-rw-r--r--weed/filer/meta_aggregator.go7
-rw-r--r--weed/filer/mysql2/mysql2_store.go2
-rw-r--r--weed/filer/postgres2/postgres2_store.go2
-rw-r--r--weed/filer/stream.go22
-rw-r--r--weed/filer/ydb/ydb_store.go2
-rw-r--r--weed/iamapi/iamapi_management_handlers.go13
-rw-r--r--weed/mount/weedfs_rename.go2
-rw-r--r--weed/server/master_server.go19
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/master_server_handlers_ui.go4
-rw-r--r--weed/shell/command_volume_configure_replication.go46
-rw-r--r--weed/shell/command_volume_server_evacuate.go129
-rw-r--r--weed/shell/command_volume_server_evacuate_test.go7
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go2
-rw-r--r--weed/storage/idx/walk.go4
-rw-r--r--weed/storage/needle_map/memdb.go2
-rw-r--r--weed/storage/needle_map_leveldb.go89
-rw-r--r--weed/storage/needle_map_memory.go2
-rw-r--r--weed/topology/data_center.go23
-rw-r--r--weed/topology/data_node.go26
-rw-r--r--weed/topology/rack.go24
-rw-r--r--weed/topology/topology_info.go (renamed from weed/topology/topology_map.go)40
-rw-r--r--weed/topology/volume_layout.go18
-rw-r--r--weed/util/retry.go2
-rw-r--r--weed/wdclient/masterclient.go6
33 files changed, 405 insertions, 191 deletions
diff --git a/.github/workflows/depsreview.yml b/.github/workflows/depsreview.yml
index b84b27d15..e13bacb2a 100644
--- a/.github/workflows/depsreview.yml
+++ b/.github/workflows/depsreview.yml
@@ -11,4 +11,4 @@ jobs:
- name: 'Checkout Repository'
uses: actions/checkout@dcd71f646680f2efd8db4afa5ad64fdcba30e748
- name: 'Dependency Review'
- uses: actions/dependency-review-action@1c59cdf2a9c7f29c90e8da32237eb04b81bad9f0
+ uses: actions/dependency-review-action@94145f3150bfabdc97540cbd5f7e926306ea7744
diff --git a/go.mod b/go.mod
index fc956e42a..90ec2acf5 100644
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/OneOfOne/xxhash v1.2.8
github.com/Shopify/sarama v1.34.1
- github.com/aws/aws-sdk-go v1.44.51
+ github.com/aws/aws-sdk-go v1.44.56
github.com/beorn7/perks v1.0.1 // indirect
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/bwmarrin/snowflake v0.3.0
@@ -37,7 +37,6 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redsync/redsync/v4 v4.5.1
github.com/go-sql-driver/mysql v1.6.0
- github.com/go-stack/stack v1.8.1 // indirect
github.com/go-zookeeper/zk v1.0.2 // indirect
github.com/gocql/gocql v0.0.0-20210707082121-9a3953d1826d
github.com/golang-jwt/jwt v3.2.2+incompatible
@@ -113,12 +112,12 @@ require (
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.etcd.io/etcd/client/v3 v3.5.4
- go.mongodb.org/mongo-driver v1.9.1
+ go.mongodb.org/mongo-driver v1.10.0
go.opencensus.io v0.23.0 // indirect
gocloud.dev v0.25.0
gocloud.dev/pubsub/natspubsub v0.25.0
gocloud.dev/pubsub/rabbitpubsub v0.25.0
- golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
+ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/exp v0.0.0-20220414153411-bcd21879b8fd
golang.org/x/image v0.0.0-20200119044424-58c23975cae1
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e
@@ -127,10 +126,10 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
- google.golang.org/api v0.86.0
+ google.golang.org/api v0.87.0
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f // indirect
- google.golang.org/grpc v1.47.0
+ google.golang.org/grpc v1.48.0
google.golang.org/protobuf v1.28.0
gopkg.in/inf.v0 v0.9.1 // indirect
modernc.org/b v1.0.0 // indirect
@@ -154,7 +153,7 @@ require (
github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0
github.com/tikv/client-go/v2 v2.0.1
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2
- github.com/ydb-platform/ydb-go-sdk/v3 v3.28.0
+ github.com/ydb-platform/ydb-go-sdk/v3 v3.28.3
google.golang.org/grpc/security/advancedtls v0.0.0-20220622233350-5cdb09fa29c1
)
@@ -200,6 +199,7 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/mattn/go-sqlite3 v2.0.1+incompatible // indirect
+ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
diff --git a/go.sum b/go.sum
index 22a9a1ac4..fe524ac66 100644
--- a/go.sum
+++ b/go.sum
@@ -157,8 +157,8 @@ github.com/armon/go-metrics v0.3.10/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb
github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
-github.com/aws/aws-sdk-go v1.44.51 h1:jO9hoLynZOrMM4dj0KjeKIK+c6PA+HQbKoHOkAEye2Y=
-github.com/aws/aws-sdk-go v1.44.51/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
+github.com/aws/aws-sdk-go v1.44.56 h1:bT+lExwagH7djxb6InKUVkEKGPAj5aAPnV85/m1fKro=
+github.com/aws/aws-sdk-go v1.44.56/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM=
@@ -346,8 +346,6 @@ github.com/go-redsync/redsync/v4 v4.5.1/go.mod h1:AfhgO1E6W3rlUTs6Zmz/B6qBZJFasV
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
-github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw=
-github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
@@ -703,6 +701,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -910,10 +909,8 @@ github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 h1:3UeQBvD0TFrlV
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
-github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
-github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e h1:9LPdmD1vqadsDQUva6t2O9MbnyvoOgo8nFNPaOIH5U8=
@@ -924,8 +921,8 @@ github.com/ydb-platform/ydb-go-genproto v0.0.0-20220531094121-36ca6bddb9f7/go.mo
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2 h1:EYSI1kulnHb0H0zt3yOw4cRj4ABMSMGwNe43D+fX7e4=
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.1.2/go.mod h1:Xfjce+VMU9yJVr1lj60yK2fFPWjB4jr/4cp3K7cjzi4=
github.com/ydb-platform/ydb-go-sdk/v3 v3.25.3/go.mod h1:PFizF/vJsdAgEwjK3DVSBD52kdmRkWfSIS2q2pA+e88=
-github.com/ydb-platform/ydb-go-sdk/v3 v3.28.0 h1:F394Kkl+QPLrl0+fWpoSafdfgGqQCZOyzvXLxzVUWfs=
-github.com/ydb-platform/ydb-go-sdk/v3 v3.28.0/go.mod h1:vXjmbeEAWlkVE5/ym3XHhtnWk7aDGGqFMKrfgwbRUkQ=
+github.com/ydb-platform/ydb-go-sdk/v3 v3.28.3 h1:bD3Xfj8XUby/rbl6hUye96eKApSmNQ8/vYMImd6W9Dc=
+github.com/ydb-platform/ydb-go-sdk/v3 v3.28.3/go.mod h1:bsYHcRuCdelVeIwNsJicIz60flewCwp8Kg9gfwMPR/Q=
github.com/ydb-platform/ydb-go-yc v0.8.3 h1:92UUUMsfvtMl6mho8eQ9lbkiPrF3a9CT+RrVRAKNRwo=
github.com/ydb-platform/ydb-go-yc v0.8.3/go.mod h1:zUolAFGzJ5XG8uwiseTLr9Lapm7L7hdVdZgLSuv9FXE=
github.com/ydb-platform/ydb-go-yc-metadata v0.5.2 h1:nMtixUijP0Z7iHJNT9fOL+dbmEzZxqU6Xk87ll7hqXg=
@@ -947,8 +944,8 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3
go.etcd.io/etcd/client/v3 v3.5.2/go.mod h1:kOOaWFFgHygyT0WlSmL8TJiXmMysO/nNUlEsSsN6W4o=
go.etcd.io/etcd/client/v3 v3.5.4 h1:p83BUL3tAYS0OT/r0qglgc3M1JjhM0diV8DSWAhVXv4=
go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY=
-go.mongodb.org/mongo-driver v1.9.1 h1:m078y9v7sBItkt1aaoe2YlvWEXcD263e1a4E1fBrJ1c=
-go.mongodb.org/mongo-driver v1.9.1/go.mod h1:0sQWfOeY63QTntERDJJ/0SuKK0T1uVSgKCuAROlKEPY=
+go.mongodb.org/mongo-driver v1.10.0 h1:UtV6N5k14upNp4LTduX0QCufG124fSu25Wz9tu94GLg=
+go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
@@ -1006,7 +1003,6 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
-golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
@@ -1017,8 +1013,8 @@ golang.org/x/crypto v0.0.0-20211115234514-b4de73f9ece8/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220331220935-ae2d96664a29/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
-golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -1296,7 +1292,6 @@ golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
@@ -1412,8 +1407,8 @@ google.golang.org/api v0.78.0/go.mod h1:1Sg78yoMLOhlQTeF+ARBoytAcH1NNyyl390YMy6r
google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3Hyg=
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
-google.golang.org/api v0.86.0 h1:ZAnyOHQFIuWso1BodVfSaRyffD74T9ERGFa3k1fNk/U=
-google.golang.org/api v0.86.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
+google.golang.org/api v0.87.0 h1:pUQVF/F+X7Tl1lo4LJoJf5BOpjtmINU80p9XpYTU2p4=
+google.golang.org/api v0.87.0/go.mod h1:+Sem1dnrKlrXMR/X0bPnMWyluQe4RsNoYfmNLhOIkzw=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@@ -1554,8 +1549,9 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
-google.golang.org/grpc v1.47.0 h1:9n77onPX5F3qfFCqjy9dhn8PbNQsIKeVU04J9G7umt8=
google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
+google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
+google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b h1:NuxyvVZoDfHZwYW9LD4GJiF5/nhiSyP4/InTrvw9Ibk=
google.golang.org/grpc/examples v0.0.0-20201112215255-90f1b3ee835b/go.mod h1:IBqQ7wSUJ2Ep09a8rMWFsg4fmI2r38zwsq8a0GgxXpM=
diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go
index 0188d18d4..815eeae54 100644
--- a/unmaintained/diff_volume_servers/diff_volume_servers.go
+++ b/unmaintained/diff_volume_servers/diff_volume_servers.go
@@ -6,6 +6,10 @@ import (
"errors"
"flag"
"fmt"
+ "io"
+ "math"
+ "os"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -16,9 +20,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
- "math"
- "os"
)
var (
@@ -155,7 +156,7 @@ func getVolumeFiles(v uint32, addr pb.ServerAddress) (map[types.NeedleId]needleS
var maxOffset int64
files := map[types.NeedleId]needleState{}
- err = idx.WalkIndexFile(idxFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
+ err = idx.WalkIndexFile(idxFile, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if offset.IsZero() || size.IsDeleted() {
files[key] = needleState{
state: stateDeleted,
diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go
index 22c659351..616263b1c 100644
--- a/unmaintained/see_idx/see_idx.go
+++ b/unmaintained/see_idx/see_idx.go
@@ -3,11 +3,12 @@ package main
import (
"flag"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
"os"
"path"
"strconv"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -36,7 +37,7 @@ func main() {
}
defer indexFile.Close()
- idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
+ idx.WalkIndexFile(indexFile, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
fmt.Printf("key:%v offset:%v size:%v(%v)\n", key, offset, size, util.BytesToHumanReadable(uint64(size)))
return nil
})
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 13268b944..a159d5272 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -32,6 +32,8 @@ type AbstractSqlStore struct {
dbsLock sync.Mutex
}
+var _ filer.BucketAware = (*AbstractSqlStore)(nil)
+
func (store *AbstractSqlStore) CanDropWholeBucket() bool {
return store.SupportBucketTable
}
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index d21515bd4..8da4a9e7f 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -121,23 +121,31 @@ func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bo
}
store.dbsLock.RUnlock()
- // upgrade to write lock
+
+ db, err := store.createDB(bucket)
+
+ return db, bucket, shortPath, err
+}
+
+func (store *LevelDB3Store) createDB(bucket string) (*leveldb.DB, error) {
+
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
// double check after getting the write lock
if db, found := store.dbs[bucket]; found {
- return db, bucket, shortPath, nil
+ return db, nil
}
// create db
db, err := store.loadDB(bucket)
if err != nil {
- return nil, bucket, shortPath, err
+ return nil, err
}
+
store.dbs[bucket] = db
- return db, bucket, shortPath, nil
+ return db, nil
}
func (store *LevelDB3Store) closeDB(bucket string) {
diff --git a/weed/filer/leveldb3/leveldb3_store_bucket.go b/weed/filer/leveldb3/leveldb3_store_bucket.go
new file mode 100644
index 000000000..823fe363b
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store_bucket.go
@@ -0,0 +1,23 @@
+package leveldb
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "os"
+)
+
+var _ filer.BucketAware = (*LevelDB3Store)(nil)
+
+func (store *LevelDB3Store) OnBucketCreation(bucket string) {
+ store.createDB(bucket)
+}
+
+func (store *LevelDB3Store) OnBucketDeletion(bucket string) {
+ store.closeDB(bucket)
+ if bucket != "" { // just to make sure
+ os.RemoveAll(store.dir + "/" + bucket)
+ }
+}
+
+func (store *LevelDB3Store) CanDropWholeBucket() bool {
+ return true
+}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index c672ce342..5799e247e 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
+ "strings"
"sync"
"time"
@@ -99,7 +100,11 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres
return
}
if err != nil {
- glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
+ errLvl := glog.Level(0)
+ if strings.Contains(err.Error(), "duplicated local subscription detected") {
+ errLvl = glog.Level(1)
+ }
+ glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
}
if lastTsNs < nextLastTsNs {
lastTsNs = nextLastTsNs
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
index e50480150..792c79e44 100644
--- a/weed/filer/mysql2/mysql2_store.go
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -18,6 +18,8 @@ const (
CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
)
+var _ filer.BucketAware = (*MysqlStore2)(nil)
+
func init() {
filer.Stores = append(filer.Stores, &MysqlStore2{})
}
diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go
index 0f573d8d0..3c57e4cb4 100644
--- a/weed/filer/postgres2/postgres2_store.go
+++ b/weed/filer/postgres2/postgres2_store.go
@@ -17,6 +17,8 @@ const (
CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
)
+var _ filer.BucketAware = (*PostgresStore2)(nil)
+
func init() {
filer.Stores = append(filer.Stores, &PostgresStore2{})
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 7da9fd0a0..d1b66e88d 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -18,6 +18,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
+var getLookupFileIdBackoffSchedule = []time.Duration{
+ 150 * time.Millisecond,
+ 600 * time.Millisecond,
+ 1800 * time.Millisecond,
+}
+
func HasData(entry *filer_pb.Entry) bool {
if len(entry.Content) > 0 {
@@ -69,14 +75,22 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
-
- urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ var urlStrings []string
+ var err error
+ for _, backoff := range getLookupFileIdBackoffSchedule {
+ urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ if err == nil && len(urlStrings) > 0 {
+ time.Sleep(backoff)
+ break
+ }
+ }
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
} else if len(urlStrings) == 0 {
- glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
- return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ glog.Error(errUrlNotFound)
+ return errUrlNotFound
}
fileId2Url[chunkView.FileId] = urlStrings
}
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
index 1e3a55a09..d5751bb5a 100644
--- a/weed/filer/ydb/ydb_store.go
+++ b/weed/filer/ydb/ydb_store.go
@@ -320,6 +320,8 @@ func (store *YdbStore) Shutdown() {
_ = store.DB.Close(context.Background())
}
+var _ filer.BucketAware = (*YdbStore)(nil)
+
func (store *YdbStore) CanDropWholeBucket() bool {
return store.SupportBucketTable
}
diff --git a/weed/iamapi/iamapi_management_handlers.go b/weed/iamapi/iamapi_management_handlers.go
index e1f215bd3..8a42aa936 100644
--- a/weed/iamapi/iamapi_management_handlers.go
+++ b/weed/iamapi/iamapi_management_handlers.go
@@ -219,8 +219,16 @@ func (iama *IamApiServer) PutUserPolicy(s3cfg *iam_pb.S3ApiConfiguration, values
if userName != ident.Name {
continue
}
+
+ existedActions := make(map[string]bool, len(ident.Actions))
+ for _, action := range ident.Actions {
+ existedActions[action] = true
+ }
+
for _, action := range actions {
- ident.Actions = append(ident.Actions, action)
+ if !existedActions[action] {
+ ident.Actions = append(ident.Actions, action)
+ }
}
return resp, nil
}
@@ -349,7 +357,8 @@ func (iama *IamApiServer) CreateAccessKey(s3cfg *iam_pb.S3ApiConfiguration, valu
}
if !changed {
s3cfg.Identities = append(s3cfg.Identities,
- &iam_pb.Identity{Name: userName,
+ &iam_pb.Identity{
+ Name: userName,
Credentials: []*iam_pb.Credential{
{
AccessKey: accessKeyId,
diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go
index 0c7de0bbb..538cfead7 100644
--- a/weed/mount/weedfs_rename.go
+++ b/weed/mount/weedfs_rename.go
@@ -235,7 +235,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
sourceInode, targetInode := wfs.inodeToPath.MovePath(oldPath, newPath)
if sourceInode != 0 {
- if fh, foundFh := wfs.fhmap.inode2fh[sourceInode]; foundFh && fh.entry != nil {
+ if fh, foundFh := wfs.fhmap.FindFileHandle(sourceInode); foundFh && fh.entry != nil {
fh.entry.Name = newName
}
// invalidate attr and data
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 9bf840f08..0fdc3944f 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -65,8 +65,8 @@ type MasterServer struct {
boundedLeaderChan chan int
- onPeerUpdatDoneCn chan string
- onPeerUpdatDoneCnExist bool
+ onPeerUpdateDoneCn chan string
+ onPeerUpdateDoneCnExist bool
// notifying clients
clientChansLock sync.RWMutex
@@ -118,7 +118,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
- ms.onPeerUpdatDoneCn = make(chan string)
+ ms.onPeerUpdateDoneCn = make(chan string)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
@@ -366,14 +366,15 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
}
- if ms.onPeerUpdatDoneCnExist {
- ms.onPeerUpdatDoneCn <- peerName
+ if ms.onPeerUpdateDoneCnExist {
+ ms.onPeerUpdateDoneCn <- peerName
}
} else if isLeader {
go func(peerName string) {
+ raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
for {
select {
- case <-time.After(RaftServerRemovalTime):
+ case <-raftServerRemovalTimeAfter:
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
Id: peerName,
@@ -384,14 +385,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
if err != nil {
glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
}
+ glog.V(0).Infof("old raft server %s removed", peerName)
return
- case peerDone := <-ms.onPeerUpdatDoneCn:
+ case peerDone := <-ms.onPeerUpdateDoneCn:
if peerName == peerDone {
+ glog.V(0).Infof("raft server %s remove canceled", peerName)
return
}
}
}
}(peerName)
- ms.onPeerUpdatDoneCnExist = true
+ ms.onPeerUpdateDoneCnExist = true
}
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 47abfb892..0c85db791 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -48,7 +48,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.Version()
- m["Topology"] = ms.Topo.ToMap()
+ m["Topology"] = ms.Topo.ToInfo()
writeJsonQuiet(w, r, http.StatusOK, m)
}
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index d8260d8d2..f09022399 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -26,7 +26,7 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
VolumeSizeLimitMB uint32
}{
util.Version(),
- ms.Topo.ToMap(),
+ ms.Topo.ToInfo(),
ms.Topo.RaftServer,
infos,
serverStats,
@@ -43,7 +43,7 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
VolumeSizeLimitMB uint32
}{
util.Version(),
- ms.Topo.ToMap(),
+ ms.Topo.ToInfo(),
ms.Topo.HashicorpRaft,
infos,
serverStats,
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index 610986489..b07d58083 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -68,45 +68,39 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
volumeFilter := getVolumeFilter(replicaPlacement, uint32(vid), *collectionPattern)
// find all data nodes with volumes that needs replication change
- var allLocations []location
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
- loc := newLocation(dc, string(rack), dn)
+ var targetVolumeIds []uint32
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
if volumeFilter(v) {
- allLocations = append(allLocations, loc)
- continue
+ targetVolumeIds = append(targetVolumeIds, v.Id)
}
}
}
- })
-
- if len(allLocations) == 0 {
- return fmt.Errorf("no volume needs change")
- }
-
- for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
- VolumeId: uint32(vid),
- Replication: replicaPlacement.String(),
- })
- if configureErr != nil {
- return configureErr
- }
- if resp.Error != "" {
- return errors.New(resp.Error)
+ if len(targetVolumeIds) == 0 {
+ return
+ }
+ err = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ for _, targetVolumeId := range targetVolumeIds {
+ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
+ VolumeId: targetVolumeId,
+ Replication: replicaPlacement.String(),
+ })
+ if configureErr != nil {
+ return configureErr
+ }
+ if resp.Error != "" {
+ return errors.New(resp.Error)
+ }
}
return nil
})
-
if err != nil {
- return err
+ return
}
+ })
- }
-
- return nil
+ return err
}
func getVolumeFilter(replicaPlacement *super_block.ReplicaPlacement, volumeId uint32, collectionPattern string) func(message *master_pb.VolumeInformationMessage) bool {
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index ffbee0302..f72d73230 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -18,7 +18,9 @@ func init() {
}
type commandVolumeServerEvacuate struct {
+ topologyInfo *master_pb.TopologyInfo
targetServer string
+ volumeRack string
}
func (c *commandVolumeServerEvacuate) Name() string {
@@ -47,7 +49,8 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
- c.targetServer = *vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
+ volumeRack := vsEvacuateCommand.String("rack", "", "source rack for the volume servers")
+ targetServer := vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
@@ -56,12 +59,18 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
}
infoAboutSimulationMode(writer, *applyChange, "-force")
- if err = commandEnv.confirmIsLocked(args); err != nil {
+ if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
return
}
- if *volumeServer == "" {
- return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
+ if *volumeServer == "" && *volumeRack == "" {
+ return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
+ }
+ if *targetServer != "" {
+ c.targetServer = *targetServer
+ }
+ if *volumeRack != "" {
+ c.volumeRack = *volumeRack
}
for i := 0; i < *retryCount+1; i++ {
if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
@@ -80,44 +89,59 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
// list all the volumes
// collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
+ c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
- if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
- if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
return nil
}
-func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server
- volumeServers := collectVolumeServersByDc(topologyInfo, "")
- thisNode, otherNodes := nodesOtherThan(volumeServers, volumeServer)
- if thisNode == nil {
+ volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
+ thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
+ if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster", volumeServer)
}
// move away normal volumes
- volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
- for _, diskInfo := range thisNode.info.DiskInfos {
- for _, vol := range diskInfo.VolumeInfos {
- hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
- if err != nil {
- return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err)
- }
- if !hasMoved {
- if skipNonMoveable {
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
- fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
+ for _, thisNode := range thisNodes {
+ for _, diskInfo := range thisNode.info.DiskInfos {
+ if applyChange {
+ if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil {
+ fmt.Fprintf(writer, "update topologyInfo %v", err)
} else {
- return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
+ _, otherNodesNew := c.nodesOtherThan(
+ collectVolumeServersByDc(topologyInfo, ""), volumeServer)
+ if len(otherNodesNew) > 0 {
+ otherNodes = otherNodesNew
+ c.topologyInfo = topologyInfo
+ fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes))
+ }
+ }
+ }
+ volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
+ for _, vol := range diskInfo.VolumeInfos {
+ hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
+ if err != nil {
+ fmt.Fprintf(writer, "move away volume %d from %s: %v", vol.Id, volumeServer, err)
+ }
+ if !hasMoved {
+ if skipNonMoveable {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
+ fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
+ } else {
+ return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
+ }
}
}
}
@@ -125,26 +149,28 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
return nil
}
-func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
- ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
- thisNode, otherNodes := ecNodesOtherThan(ecNodes, volumeServer)
- if thisNode == nil {
+ ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
+ thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
+ if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
}
// move away ec volumes
- for _, diskInfo := range thisNode.info.DiskInfos {
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
- if err != nil {
- return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
- }
- if !hasMoved {
- if skipNonMoveable {
- fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
- } else {
- return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
+ for _, thisNode := range thisNodes {
+ for _, diskInfo := range thisNode.info.DiskInfos {
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
+ if err != nil {
+ fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
+ }
+ if !hasMoved {
+ if skipNonMoveable {
+ fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
+ } else {
+ return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
+ }
}
}
}
@@ -160,9 +186,6 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
})
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
- if c.targetServer != "" && c.targetServer != emptyNode.info.Id {
- continue
- }
collectionPrefix := ""
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
@@ -207,10 +230,16 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][
return
}
-func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, otherNodes []*Node) {
+func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
for _, node := range volumeServers {
- if node.info.Id == thisServer {
- thisNode = node
+ if node.info.Id == thisServer || (c.volumeRack != "" && node.rack == c.volumeRack) {
+ thisNodes = append(thisNodes, node)
+ continue
+ }
+ if c.volumeRack != "" && c.volumeRack == node.rack {
+ continue
+ }
+ if c.targetServer != "" && c.targetServer != node.info.Id {
continue
}
otherNodes = append(otherNodes, node)
@@ -218,10 +247,16 @@ func nodesOtherThan(volumeServers []*Node, thisServer string) (thisNode *Node, o
return
}
-func ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNode *EcNode, otherNodes []*EcNode) {
+func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
for _, node := range volumeServers {
- if node.info.Id == thisServer {
- thisNode = node
+ if node.info.Id == thisServer || (c.volumeRack != "" && string(node.rack) == c.volumeRack) {
+ thisNodes = append(thisNodes, node)
+ continue
+ }
+ if c.volumeRack != "" && c.volumeRack == string(node.rack) {
+ continue
+ }
+ if c.targetServer != "" && c.targetServer != node.info.Id {
continue
}
otherNodes = append(otherNodes, node)
diff --git a/weed/shell/command_volume_server_evacuate_test.go b/weed/shell/command_volume_server_evacuate_test.go
index 2cdb94a60..4563f38ba 100644
--- a/weed/shell/command_volume_server_evacuate_test.go
+++ b/weed/shell/command_volume_server_evacuate_test.go
@@ -6,12 +6,11 @@ import (
)
func TestVolumeServerEvacuate(t *testing.T) {
- topologyInfo := parseOutput(topoData)
+ c := commandVolumeServerEvacuate{}
+ c.topologyInfo = parseOutput(topoData)
volumeServer := "192.168.1.4:8080"
-
- c := commandVolumeServerEvacuate{}
- if err := c.evacuateNormalVolumes(nil, topologyInfo, volumeServer, true, false, os.Stdout); err != nil {
+ if err := c.evacuateNormalVolumes(nil, volumeServer, true, false, os.Stdout); err != nil {
t.Errorf("evacuate: %v", err)
}
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 157149865..ea331ca39 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -294,7 +294,7 @@ func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
defer indexFile.Close()
cm := needle_map.NewMemDb()
- err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
+ err = idx.WalkIndexFile(indexFile, 0, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size)
} else {
diff --git a/weed/storage/idx/walk.go b/weed/storage/idx/walk.go
index 5215d3c4f..70d3855ea 100644
--- a/weed/storage/idx/walk.go
+++ b/weed/storage/idx/walk.go
@@ -9,8 +9,8 @@ import (
// walks through the index file, calls fn function with each key, offset, size
// stops with the error returned by the fn function
-func WalkIndexFile(r io.ReaderAt, fn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
- var readerOffset int64
+func WalkIndexFile(r io.ReaderAt, startFrom uint64, fn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
+ readerOffset := int64(startFrom * types.NeedleMapEntrySize)
bytes := make([]byte, types.NeedleMapEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset)
if count == 0 && e == io.EOF {
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index ba1fd3d1e..a362a85ae 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -111,7 +111,7 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) {
- return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error {
+ return idx.WalkIndexFile(readerAt, 0, func(key NeedleId, offset Offset, size Size) error {
if offset.IsZero() || size.IsDeleted() {
return cm.Delete(key)
}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index 31c86d124..a1934b8f1 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -9,6 +9,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
@@ -17,10 +19,16 @@ import (
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
+//mark it every watermarkBatchSize operations
+const watermarkBatchSize = 10000
+
+var watermarkKey = []byte("idx_entry_watermark")
+
type LevelDbNeedleMap struct {
baseNeedleMapper
- dbFileName string
- db *leveldb.DB
+ dbFileName string
+ db *leveldb.DB
+ recordCount uint64
}
func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) {
@@ -46,7 +54,14 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
return
}
}
- glog.V(1).Infof("Loading %s...", indexFile.Name())
+ glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
+ m.recordCount = uint64(m.indexFileOffset / types.NeedleMapEntrySize)
+ watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
+ err = setWatermark(m.db, watermark)
+ if err != nil {
+ glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
+ return
+ }
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
if indexLoadError != nil {
return nil, indexLoadError
@@ -78,9 +93,20 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err
}
defer db.Close()
- return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
+
+ watermark := getWatermark(db)
+ if stat, err := indexFile.Stat(); err != nil {
+ glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
+ return err
+ } else {
+ if watermark*types.NeedleMapEntrySize > uint64(stat.Size()) {
+ glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size())
+ }
+ glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*types.NeedleMapEntrySize)/types.NeedleMapEntrySize)
+ }
+ return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error {
if !offset.IsZero() && size.IsValid() {
- levelDbWrite(db, key, offset, size)
+ levelDbWrite(db, key, offset, size, false, 0)
} else {
levelDbDelete(db, key)
}
@@ -102,6 +128,7 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
var oldSize Size
+ var watermark uint64
if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size
}
@@ -110,16 +137,54 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
if err := m.appendToIndexFile(key, offset, size); err != nil {
return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
}
- return levelDbWrite(m.db, key, offset, size)
+ m.recordCount++
+ if m.recordCount%watermarkBatchSize != 0 {
+ watermark = 0
+ } else {
+ watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
+ glog.V(1).Infof("put cnt:%d for %s,watermark: %d", m.recordCount, m.dbFileName, watermark)
+ }
+ return levelDbWrite(m.db, key, offset, size, watermark == 0, watermark)
+}
+
+func getWatermark(db *leveldb.DB) uint64 {
+ data, err := db.Get(watermarkKey, nil)
+ if err != nil || len(data) != 8 {
+ glog.Warningf("get watermark from db error: %v, %d", err, len(data))
+ /*
+ if !strings.Contains(strings.ToLower(err.Error()), "not found") {
+ err = setWatermark(db, 0)
+ if err != nil {
+ glog.Errorf("failed to set watermark: %v", err)
+ }
+ }
+ */
+ return 0
+ }
+ return util.BytesToUint64(data)
}
-func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size) error {
+func setWatermark(db *leveldb.DB, watermark uint64) error {
+ glog.V(1).Infof("set watermark %d", watermark)
+ var wmBytes = make([]byte, 8)
+ util.Uint64toBytes(wmBytes, watermark)
+ if err := db.Put(watermarkKey, wmBytes, nil); err != nil {
+ return fmt.Errorf("failed to setWatermark: %v", err)
+ }
+ return nil
+}
+
+func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, updateWatermark bool, watermark uint64) error {
bytes := needle_map.ToBytes(key, offset, size)
if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
return fmt.Errorf("failed to write leveldb: %v", err)
}
+ // set watermark
+ if updateWatermark {
+ return setWatermark(db, watermark)
+ }
return nil
}
func levelDbDelete(db *leveldb.DB, key NeedleId) error {
@@ -129,6 +194,7 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
+ var watermark uint64
oldNeedle, found := m.Get(key)
if !found || oldNeedle.Size.IsDeleted() {
return nil
@@ -139,8 +205,13 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
-
- return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size)
+ m.recordCount++
+ if m.recordCount%watermarkBatchSize != 0 {
+ watermark = 0
+ } else {
+ watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
+ }
+ return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, watermark == 0, watermark)
}
func (m *LevelDbNeedleMap) Close() {
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index 1b58708c6..4c7909dbd 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -33,7 +33,7 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
}
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
- e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
+ e := idx.WalkIndexFile(file, 0, func(key NeedleId, offset Offset, size Size) error {
nm.MaybeSetMaxFileKey(key)
if !offset.IsZero() && size.IsValid() {
nm.FileCounter++
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
index 60d91ba6d..78c23e748 100644
--- a/weed/topology/data_center.go
+++ b/weed/topology/data_center.go
@@ -2,6 +2,7 @@ package topology
import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "golang.org/x/exp/slices"
)
type DataCenter struct {
@@ -30,16 +31,24 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
return rack
}
-func (dc *DataCenter) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Id"] = dc.Id()
- var racks []interface{}
+type DataCenterInfo struct {
+ Id NodeId `json:"Id"`
+ Racks []RackInfo `json:"Racks"`
+}
+
+func (dc *DataCenter) ToInfo() (info DataCenterInfo) {
+ info.Id = dc.Id()
+ var racks []RackInfo
for _, c := range dc.Children() {
rack := c.(*Rack)
- racks = append(racks, rack.ToMap())
+ racks = append(racks, rack.ToInfo())
}
- m["Racks"] = racks
- return m
+
+ slices.SortFunc(racks, func(a, b RackInfo) bool {
+ return a.Id < b.Id
+ })
+ info.Racks = racks
+ return
}
func (dc *DataCenter) ToDataCenterInfo() *master_pb.DataCenterInfo {
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 6bdbd965f..33bff2d59 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -217,10 +217,18 @@ func (dn *DataNode) ServerAddress() pb.ServerAddress {
return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort)
}
-func (dn *DataNode) ToMap() interface{} {
- ret := make(map[string]interface{})
- ret["Url"] = dn.Url()
- ret["PublicUrl"] = dn.PublicUrl
+type DataNodeInfo struct {
+ Url string `json:"Url"`
+ PublicUrl string `json:"PublicUrl"`
+ Volumes int64 `json:"Volumes"`
+ EcShards int64 `json:"EcShards"`
+ Max int64 `json:"Max"`
+ VolumeIds string `json:"VolumeIds"`
+}
+
+func (dn *DataNode) ToInfo() (info DataNodeInfo) {
+ info.Url = dn.Url()
+ info.PublicUrl = dn.PublicUrl
// aggregated volume info
var volumeCount, ecShardCount, maxVolumeCount int64
@@ -236,12 +244,12 @@ func (dn *DataNode) ToMap() interface{} {
volumeIds += " " + d.GetVolumeIds()
}
- ret["Volumes"] = volumeCount
- ret["EcShards"] = ecShardCount
- ret["Max"] = maxVolumeCount
- ret["VolumeIds"] = volumeIds
+ info.Volumes = volumeCount
+ info.EcShards = ecShardCount
+ info.Max = maxVolumeCount
+ info.VolumeIds = volumeIds
- return ret
+ return
}
func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index cd09746b2..7b0ed4a54 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "golang.org/x/exp/slices"
"time"
)
@@ -53,16 +54,25 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl
return dn
}
-func (r *Rack) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Id"] = r.Id()
- var dns []interface{}
+type RackInfo struct {
+ Id NodeId `json:"Id"`
+ DataNodes []DataNodeInfo `json:"DataNodes"`
+}
+
+func (r *Rack) ToInfo() (info RackInfo) {
+ info.Id = r.Id()
+ var dns []DataNodeInfo
for _, c := range r.Children() {
dn := c.(*DataNode)
- dns = append(dns, dn.ToMap())
+ dns = append(dns, dn.ToInfo())
}
- m["DataNodes"] = dns
- return m
+
+ slices.SortFunc(dns, func(a, b DataNodeInfo) bool {
+ return a.Url < b.Url
+ })
+
+ info.DataNodes = dns
+ return
}
func (r *Rack) ToRackInfo() *master_pb.RackInfo {
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_info.go
index 0fedb6221..21ce77edf 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_info.go
@@ -1,30 +1,44 @@
package topology
-import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "golang.org/x/exp/slices"
+)
-func (t *Topology) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.diskUsages.GetMaxVolumeCount()
- m["Free"] = t.diskUsages.FreeSpace()
- var dcs []interface{}
+type TopologyInfo struct {
+ Max int64 `json:"Max"`
+ Free int64 `json:"Free"`
+ DataCenters []DataCenterInfo `json:"DataCenters"`
+ Layouts []VolumeLayoutInfo `json:"Layouts"`
+}
+
+func (t *Topology) ToInfo() (info TopologyInfo) {
+ info.Max = t.diskUsages.GetMaxVolumeCount()
+ info.Free = t.diskUsages.FreeSpace()
+ var dcs []DataCenterInfo
for _, c := range t.Children() {
dc := c.(*DataCenter)
- dcs = append(dcs, dc.ToMap())
+ dcs = append(dcs, dc.ToInfo())
}
- m["DataCenters"] = dcs
- var layouts []interface{}
+
+ slices.SortFunc(dcs, func(a, b DataCenterInfo) bool {
+ return a.Id < b.Id
+ })
+
+ info.DataCenters = dcs
+ var layouts []VolumeLayoutInfo
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, layout := range c.storageType2VolumeLayout.Items() {
if layout != nil {
- tmp := layout.(*VolumeLayout).ToMap()
- tmp["collection"] = c.Name
+ tmp := layout.(*VolumeLayout).ToInfo()
+ tmp.Collection = c.Name
layouts = append(layouts, tmp)
}
}
}
- m["Layouts"] = layouts
- return m
+ info.Layouts = layouts
+ return
}
func (t *Topology) ToVolumeMap() interface{} {
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index dee82762a..03c4c4adf 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -473,13 +473,19 @@ func (vl *VolumeLayout) SetVolumeCrowded(vid needle.VolumeId) {
}
}
-func (vl *VolumeLayout) ToMap() map[string]interface{} {
- m := make(map[string]interface{})
- m["replication"] = vl.rp.String()
- m["ttl"] = vl.ttl.String()
- m["writables"] = vl.writables
+type VolumeLayoutInfo struct {
+ Replication string `json:"replication"`
+ TTL string `json:"ttl"`
+ Writables []needle.VolumeId `json:"writables"`
+ Collection string `json:"collection"`
+}
+
+func (vl *VolumeLayout) ToInfo() (info VolumeLayoutInfo) {
+ info.Replication = vl.rp.String()
+ info.TTL = vl.ttl.String()
+ info.Writables = vl.writables
//m["locations"] = vl.vid2location
- return m
+ return
}
func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
diff --git a/weed/util/retry.go b/weed/util/retry.go
index b7cd278b3..892341dc1 100644
--- a/weed/util/retry.go
+++ b/weed/util/retry.go
@@ -32,7 +32,7 @@ func Retry(name string, job func() error) (err error) {
return err
}
-func RetryForever(name string, job func() error, onErrFn func(err error) bool) {
+func RetryForever(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) {
waitTime := time.Second
for {
err := job()
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 1ff735788..2d0b00b76 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -2,6 +2,7 @@ package wdclient
import (
"context"
+ "fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"math/rand"
"time"
@@ -46,7 +47,7 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
fullUrls, err = mc.vidMap.LookupFileId(fileId)
- if err == nil {
+ if err == nil && len(fullUrls) > 0 {
return
}
err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
@@ -54,7 +55,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
VolumeOrFileIds: []string{fileId},
})
if err != nil {
- return err
+ return fmt.Errorf("LookupVolume failed: %v", err)
}
for vid, vidLocation := range resp.VolumeIdLocations {
for _, vidLoc := range vidLocation.Locations {
@@ -67,7 +68,6 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId)
}
}
-
return nil
})
return