diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-13 00:03:47 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-13 00:03:47 -0700 |
| commit | b771fefa374fe237ff1317bbd03a9297a52191e3 (patch) | |
| tree | 9856be72db18e6ae32523640cd3a9e3ca61fbeeb /weed | |
| parent | 0a851ec00b455c72b405503f6f1f41728b15962e (diff) | |
| parent | 8908810376e671b34262295f1ad558ca43db58c2 (diff) | |
| download | seaweedfs-b771fefa374fe237ff1317bbd03a9297a52191e3.tar.xz seaweedfs-b771fefa374fe237ff1317bbd03a9297a52191e3.zip | |
Merge branch 'master' into sub
Diffstat (limited to 'weed')
38 files changed, 871 insertions, 342 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 7f132892e..7f9a23cf8 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -127,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/command/filer.go b/weed/command/filer.go index 83e2abdac..7e636974f 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -33,7 +33,7 @@ var ( ) type FilerOptions struct { - masters map[string]pb.ServerAddress + masters *pb.ServerDiscovery mastersString *string ip *string bindIp *string @@ -65,7 +65,7 @@ type FilerOptions struct { func init() { cmdFiler.Run = runFiler // break init cycle - f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers or a single DNS SRV record of at least 1 master server, prepended with dnssrv+") f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") @@ -208,7 +208,7 @@ func runFiler(cmd *Command, args []string) bool { }(startDelay) } - f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap() + f.masters = pb.ServerAddresses(*f.mastersString).ToServiceDiscovery() f.startFiler() diff --git a/weed/command/server.go b/weed/command/server.go index fecb1cad6..7fbb59676 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -203,7 +203,7 @@ func runServer(cmd *Command, args []string) bool { // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap() + filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToServiceDiscovery() filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp s3Options.bindIp = serverBindIp @@ -216,7 +216,7 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack mqBrokerOptions.ip = serverIp - mqBrokerOptions.masters = filerOptions.masters + mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap() mqBrokerOptions.filerGroup = filerOptions.filerGroup // serverOptions.v.pulseSeconds = pulseSeconds diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 8570faa7a..fdc425f07 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -52,8 +52,7 @@ type Filer struct { Dlm *lock_manager.DistributedLockManager } -func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, - filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { +func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters), fileIdDeletionQueue: util.NewUnboundedQueue(), diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 0fac6a138..0eecdd6cb 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -292,10 +292,8 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u count := int64(0) for count < limit && len(notPrefixed) > 0 { - var isLastItemHasPrefix bool for _, entry := range notPrefixed { if strings.HasPrefix(entry.Name(), prefix) { - isLastItemHasPrefix = true count++ if !eachEntryFunc(entry) { return @@ -303,11 +301,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u if count >= limit { break } - } else { - isLastItemHasPrefix = false } } - if count < limit && isLastItemHasPrefix && len(notPrefixed) == int(limit) { + if count < limit && lastFileName <= prefix && len(notPrefixed) == int(limit) { notPrefixed = notPrefixed[:0] lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool { notPrefixed = append(notPrefixed, entry) diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index 7013f67a7..c8e71a003 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -3,6 +3,7 @@ package leveldb import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "os" "testing" "time" @@ -12,7 +13,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) @@ -65,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) @@ -87,7 +88,7 @@ func TestEmptyRoot(t *testing.T) { } func BenchmarkInsertEntry(b *testing.B) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := b.TempDir() store := &LevelDBStore{} store.initialize(dir) diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go index f7ec99e06..b25dcc7b8 100644 --- a/weed/filer/leveldb2/leveldb2_store_test.go +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -2,6 +2,7 @@ package leveldb import ( "context" + "github.com/seaweedfs/seaweedfs/weed/pb" "testing" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -9,7 +10,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) @@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go index e2e4d5099..a2d8dd8a3 100644 --- a/weed/filer/leveldb3/leveldb3_store_test.go +++ b/weed/filer/leveldb3/leveldb3_store_test.go @@ -2,6 +2,7 @@ package leveldb import ( "context" + "github.com/seaweedfs/seaweedfs/weed/pb" "testing" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -9,7 +10,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) @@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go index e89327baa..e24274d2a 100644 --- a/weed/filer/rocksdb/rocksdb_store_test.go +++ b/weed/filer/rocksdb/rocksdb_store_test.go @@ -15,7 +15,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil) dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) @@ -68,7 +68,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil) dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) @@ -90,7 +90,7 @@ func TestEmptyRoot(t *testing.T) { } func BenchmarkInsertEntry(b *testing.B) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil) dir := b.TempDir() store := &RocksDBStore{} store.initialize(dir) diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index 223bcb296..63d2e7a75 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -50,7 +50,7 @@ var s3ApiConfigure IamS3ApiConfig func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) { s3ApiConfigure = IamS3ApiConfigure{ option: option, - masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", option.Masters), + masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)), } s3Option := s3api.S3ApiServerOption{Filer: option.Filer} iamApiServer = &IamApiServer{ diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 22da80cb6..aceaab82b 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -41,7 +41,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), filers: make(map[pb.ServerAddress]struct{}), localTopicManager: topic.NewLocalTopicManager(), } diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index c2f5a806d..f6d0e0110 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "sync" ) type VolumeAssignRequest struct { @@ -34,6 +34,110 @@ type AssignResult struct { Replicas []Location `json:"replicas,omitempty"` } +// This is a proxy to the master server, only for assigning volume ids. +// It runs via grpc to the master server in streaming mode. +// The connection to the master would only be re-established when the last connection has error. +type AssignProxy struct { + grpcConnection *grpc.ClientConn + pool chan *singleThreadAssignProxy +} + +func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) { + ap = &AssignProxy{ + pool: make(chan *singleThreadAssignProxy, concurrency), + } + ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption) + if err != nil { + return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err) + } + for i := 0; i < concurrency; i++ { + ap.pool <- &singleThreadAssignProxy{} + } + return ap, nil +} + +func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { + p := <-ap.pool + defer func() { + ap.pool <- p + }() + + return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...) +} + +type singleThreadAssignProxy struct { + assignClient master_pb.Seaweed_StreamAssignClient + sync.Mutex +} + +func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { + ap.Lock() + defer ap.Unlock() + + if ap.assignClient == nil { + client := master_pb.NewSeaweedClient(grpcConnection) + ap.assignClient, err = client.StreamAssign(context.Background()) + if err != nil { + ap.assignClient = nil + return nil, fmt.Errorf("fail to create stream assign client: %v", err) + } + } + + var requests []*VolumeAssignRequest + requests = append(requests, primaryRequest) + requests = append(requests, alternativeRequests...) + ret = &AssignResult{} + + for _, request := range requests { + if request == nil { + continue + } + req := &master_pb.AssignRequest{ + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DiskType: request.DiskType, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, + } + if err = ap.assignClient.Send(req); err != nil { + return nil, fmt.Errorf("StreamAssignSend: %v", err) + } + resp, grpcErr := ap.assignClient.Recv() + if grpcErr != nil { + return nil, grpcErr + } + if resp.Error != "" { + return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error) + } + + ret.Count = resp.Count + ret.Fid = resp.Fid + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) + ret.Error = resp.Error + ret.Auth = security.EncodedJwt(resp.Auth) + for _, r := range resp.Replicas { + ret.Replicas = append(ret.Replicas, Location{ + Url: r.Url, + PublicUrl: r.PublicUrl, + DataCenter: r.DataCenter, + }) + } + + if ret.Count <= 0 { + continue + } + break + } + + return +} + func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go new file mode 100644 index 000000000..f6362dceb --- /dev/null +++ b/weed/operation/assign_file_id_test.go @@ -0,0 +1,68 @@ +package operation + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" + "testing" + "time" +) + +func BenchmarkWithConcurrency(b *testing.B) { + concurrencyLevels := []int{1, 10, 100, 1000} + + ap, _ := NewAssignProxy(func() pb.ServerAddress { + return pb.ServerAddress("localhost:9333") + }, grpc.WithInsecure(), 16) + + for _, concurrency := range concurrencyLevels { + b.Run( + fmt.Sprintf("Concurrency-%d", concurrency), + func(b *testing.B) { + for i := 0; i < b.N; i++ { + done := make(chan struct{}) + startTime := time.Now() + + for j := 0; j < concurrency; j++ { + go func() { + + ap.Assign(&VolumeAssignRequest{ + Count: 1, + }) + + done <- struct{}{} + }() + } + + for j := 0; j < concurrency; j++ { + <-done + } + + duration := time.Since(startTime) + b.Logf("Concurrency: %d, Duration: %v", concurrency, duration) + } + }, + ) + } +} + +func BenchmarkStreamAssign(b *testing.B) { + ap, _ := NewAssignProxy(func() pb.ServerAddress { + return pb.ServerAddress("localhost:9333") + }, grpc.WithInsecure(), 16) + for i := 0; i < b.N; i++ { + ap.Assign(&VolumeAssignRequest{ + Count: 1, + }) + } +} + +func BenchmarkUnaryAssign(b *testing.B) { + for i := 0; i < b.N; i++ { + Assign(func() pb.ServerAddress { + return pb.ServerAddress("localhost:9333") + }, grpc.WithInsecure(), &VolumeAssignRequest{ + Count: 1, + }) + } +} diff --git a/weed/pb/master.proto b/weed/pb/master.proto index be4f4a78b..94277104f 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -15,6 +15,8 @@ service Seaweed { } rpc Assign (AssignRequest) returns (AssignResponse) { } + rpc StreamAssign (stream AssignRequest) returns (stream AssignResponse) { + } rpc Statistics (StatisticsRequest) returns (StatisticsResponse) { } rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) { diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 5c0b5d774..4e61faafe 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -4591,7 +4591,7 @@ var file_master_proto_rawDesc = []byte{ 0x72, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x75, 0x66, 0x66, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x32, 0xbd, 0x0e, 0x0a, 0x07, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0d, + 0x32, 0x88, 0x0f, 0x0a, 0x07, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0d, 0x53, 0x65, 0x6e, 0x64, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12, 0x14, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x1a, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, @@ -4611,106 +4611,111 @@ var file_master_proto_rawDesc = []byte{ 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, - 0x69, 0x63, 0x73, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, - 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x4c, 0x69, 0x73, 0x74, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x0c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x41, + 0x73, 0x73, 0x69, 0x67, 0x6e, 0x12, 0x18, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, + 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x19, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, + 0x67, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x4b, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x12, 0x1c, + 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x69, + 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, + 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, + 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, + 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x12, + 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, + 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x10, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x73, + 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, + 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0a, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, + 0x69, 0x73, 0x74, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, + 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, + 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, + 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, - 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x69, 0x73, - 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x10, 0x43, - 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, - 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, - 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0a, 0x56, 0x6f, - 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1c, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, - 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, - 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, - 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, - 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x6d, 0x61, - 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, - 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x51, 0x0a, 0x0c, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, - 0x12, 0x1e, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x61, 0x63, - 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x61, 0x63, - 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x0d, 0x44, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, - 0x63, 0x75, 0x75, 0x6d, 0x12, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x44, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x44, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0c, 0x45, 0x6e, 0x61, - 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x12, 0x1e, 0x2e, 0x6d, 0x61, 0x73, 0x74, - 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, - 0x75, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, - 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, - 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x12, + 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x45, 0x63, 0x56, 0x6f, 0x6c, 0x75, 0x6d, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0c, 0x56, + 0x61, 0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x61, + 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, + 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x61, + 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, + 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, + 0x0a, 0x0d, 0x44, 0x69, 0x73, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x12, + 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x69, 0x73, 0x61, + 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x69, 0x73, + 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0c, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, + 0x63, 0x75, 0x75, 0x6d, 0x12, 0x1e, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x45, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x63, 0x0a, 0x12, 0x56, 0x6f, 0x6c, 0x75, 0x6d, + 0x65, 0x4d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x61, 0x64, 0x6f, 0x6e, 0x6c, 0x79, 0x12, 0x24, 0x2e, + 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, + 0x4d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x61, 0x64, 0x6f, 0x6e, 0x6c, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x61, 0x64, 0x6f, 0x6e, - 0x6c, 0x79, 0x12, 0x24, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, - 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4d, 0x61, 0x72, 0x6b, 0x52, 0x65, 0x61, 0x64, 0x6f, 0x6e, 0x6c, - 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x4d, 0x61, 0x72, 0x6b, 0x52, - 0x65, 0x61, 0x64, 0x6f, 0x6e, 0x6c, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x6f, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, 0x61, - 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, - 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x10, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, - 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, - 0x64, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, - 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x5a, 0x0a, 0x0f, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, - 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, - 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, - 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, - 0x11, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, - 0x65, 0x6e, 0x12, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, + 0x6c, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x16, + 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x28, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, + 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x29, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, + 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, + 0x10, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, + 0x73, 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, + 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x6f, 0x64, + 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5a, 0x0a, 0x0f, + 0x4c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, + 0x21, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x65, 0x61, 0x73, + 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, + 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x6c, 0x65, + 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x23, 0x2e, + 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, + 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x54, 0x6f, 0x6b, 0x65, 0x6e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, - 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x41, 0x64, 0x6d, 0x69, 0x6e, - 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x39, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x16, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, - 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x17, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x69, 0x6e, 0x67, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x16, 0x52, 0x61, - 0x66, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x73, 0x12, 0x28, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, - 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4c, - 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x0d, 0x52, - 0x61, 0x66, 0x74, 0x41, 0x64, 0x64, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x1f, 0x2e, 0x6d, - 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x41, 0x64, 0x64, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, - 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x41, 0x64, - 0x64, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x5d, 0x0a, 0x10, 0x52, 0x61, 0x66, 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x53, - 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, - 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, - 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, - 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x61, 0x73, 0x74, 0x65, - 0x72, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x04, 0x50, 0x69, + 0x6e, 0x67, 0x12, 0x16, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x50, + 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x6d, 0x61, 0x73, + 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x16, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x69, 0x73, + 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, + 0x28, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, + 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x6d, 0x61, 0x73, 0x74, + 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x54, 0x0a, 0x0d, 0x52, 0x61, 0x66, 0x74, 0x41, 0x64, + 0x64, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x1f, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, + 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x41, 0x64, 0x64, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, + 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, 0x74, 0x41, 0x64, 0x64, 0x53, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, 0x0a, 0x10, + 0x52, 0x61, 0x66, 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x12, 0x22, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x61, 0x66, + 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x52, 0x61, 0x66, 0x74, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x32, 0x5a, 0x30, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, + 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, + 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -4837,46 +4842,48 @@ var file_master_proto_depIdxs = []int32{ 8, // 37: master_pb.Seaweed.KeepConnected:input_type -> master_pb.KeepConnectedRequest 12, // 38: master_pb.Seaweed.LookupVolume:input_type -> master_pb.LookupVolumeRequest 15, // 39: master_pb.Seaweed.Assign:input_type -> master_pb.AssignRequest - 17, // 40: master_pb.Seaweed.Statistics:input_type -> master_pb.StatisticsRequest - 20, // 41: master_pb.Seaweed.CollectionList:input_type -> master_pb.CollectionListRequest - 22, // 42: master_pb.Seaweed.CollectionDelete:input_type -> master_pb.CollectionDeleteRequest - 29, // 43: master_pb.Seaweed.VolumeList:input_type -> master_pb.VolumeListRequest - 31, // 44: master_pb.Seaweed.LookupEcVolume:input_type -> master_pb.LookupEcVolumeRequest - 33, // 45: master_pb.Seaweed.VacuumVolume:input_type -> master_pb.VacuumVolumeRequest - 35, // 46: master_pb.Seaweed.DisableVacuum:input_type -> master_pb.DisableVacuumRequest - 37, // 47: master_pb.Seaweed.EnableVacuum:input_type -> master_pb.EnableVacuumRequest - 39, // 48: master_pb.Seaweed.VolumeMarkReadonly:input_type -> master_pb.VolumeMarkReadonlyRequest - 41, // 49: master_pb.Seaweed.GetMasterConfiguration:input_type -> master_pb.GetMasterConfigurationRequest - 43, // 50: master_pb.Seaweed.ListClusterNodes:input_type -> master_pb.ListClusterNodesRequest - 45, // 51: master_pb.Seaweed.LeaseAdminToken:input_type -> master_pb.LeaseAdminTokenRequest - 47, // 52: master_pb.Seaweed.ReleaseAdminToken:input_type -> master_pb.ReleaseAdminTokenRequest - 49, // 53: master_pb.Seaweed.Ping:input_type -> master_pb.PingRequest - 55, // 54: master_pb.Seaweed.RaftListClusterServers:input_type -> master_pb.RaftListClusterServersRequest - 51, // 55: master_pb.Seaweed.RaftAddServer:input_type -> master_pb.RaftAddServerRequest - 53, // 56: master_pb.Seaweed.RaftRemoveServer:input_type -> master_pb.RaftRemoveServerRequest - 1, // 57: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse - 11, // 58: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse - 13, // 59: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse - 16, // 60: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse - 18, // 61: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse - 21, // 62: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse - 23, // 63: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse - 30, // 64: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse - 32, // 65: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse - 34, // 66: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse - 36, // 67: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse - 38, // 68: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse - 40, // 69: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse - 42, // 70: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse - 44, // 71: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse - 46, // 72: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse - 48, // 73: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse - 50, // 74: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse - 56, // 75: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse - 52, // 76: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse - 54, // 77: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse - 57, // [57:78] is the sub-list for method output_type - 36, // [36:57] is the sub-list for method input_type + 15, // 40: master_pb.Seaweed.StreamAssign:input_type -> master_pb.AssignRequest + 17, // 41: master_pb.Seaweed.Statistics:input_type -> master_pb.StatisticsRequest + 20, // 42: master_pb.Seaweed.CollectionList:input_type -> master_pb.CollectionListRequest + 22, // 43: master_pb.Seaweed.CollectionDelete:input_type -> master_pb.CollectionDeleteRequest + 29, // 44: master_pb.Seaweed.VolumeList:input_type -> master_pb.VolumeListRequest + 31, // 45: master_pb.Seaweed.LookupEcVolume:input_type -> master_pb.LookupEcVolumeRequest + 33, // 46: master_pb.Seaweed.VacuumVolume:input_type -> master_pb.VacuumVolumeRequest + 35, // 47: master_pb.Seaweed.DisableVacuum:input_type -> master_pb.DisableVacuumRequest + 37, // 48: master_pb.Seaweed.EnableVacuum:input_type -> master_pb.EnableVacuumRequest + 39, // 49: master_pb.Seaweed.VolumeMarkReadonly:input_type -> master_pb.VolumeMarkReadonlyRequest + 41, // 50: master_pb.Seaweed.GetMasterConfiguration:input_type -> master_pb.GetMasterConfigurationRequest + 43, // 51: master_pb.Seaweed.ListClusterNodes:input_type -> master_pb.ListClusterNodesRequest + 45, // 52: master_pb.Seaweed.LeaseAdminToken:input_type -> master_pb.LeaseAdminTokenRequest + 47, // 53: master_pb.Seaweed.ReleaseAdminToken:input_type -> master_pb.ReleaseAdminTokenRequest + 49, // 54: master_pb.Seaweed.Ping:input_type -> master_pb.PingRequest + 55, // 55: master_pb.Seaweed.RaftListClusterServers:input_type -> master_pb.RaftListClusterServersRequest + 51, // 56: master_pb.Seaweed.RaftAddServer:input_type -> master_pb.RaftAddServerRequest + 53, // 57: master_pb.Seaweed.RaftRemoveServer:input_type -> master_pb.RaftRemoveServerRequest + 1, // 58: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse + 11, // 59: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse + 13, // 60: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse + 16, // 61: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse + 16, // 62: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse + 18, // 63: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse + 21, // 64: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse + 23, // 65: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse + 30, // 66: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse + 32, // 67: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse + 34, // 68: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse + 36, // 69: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse + 38, // 70: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse + 40, // 71: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse + 42, // 72: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse + 44, // 73: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse + 46, // 74: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse + 48, // 75: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse + 50, // 76: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse + 56, // 77: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse + 52, // 78: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse + 54, // 79: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse + 58, // [58:80] is the sub-list for method output_type + 36, // [36:58] is the sub-list for method input_type 36, // [36:36] is the sub-list for extension type_name 36, // [36:36] is the sub-list for extension extendee 0, // [0:36] is the sub-list for field type_name diff --git a/weed/pb/master_pb/master_grpc.pb.go b/weed/pb/master_pb/master_grpc.pb.go index b0c3f932c..2afcdbbd9 100644 --- a/weed/pb/master_pb/master_grpc.pb.go +++ b/weed/pb/master_pb/master_grpc.pb.go @@ -26,6 +26,7 @@ type SeaweedClient interface { KeepConnected(ctx context.Context, opts ...grpc.CallOption) (Seaweed_KeepConnectedClient, error) LookupVolume(ctx context.Context, in *LookupVolumeRequest, opts ...grpc.CallOption) (*LookupVolumeResponse, error) Assign(ctx context.Context, in *AssignRequest, opts ...grpc.CallOption) (*AssignResponse, error) + StreamAssign(ctx context.Context, opts ...grpc.CallOption) (Seaweed_StreamAssignClient, error) Statistics(ctx context.Context, in *StatisticsRequest, opts ...grpc.CallOption) (*StatisticsResponse, error) CollectionList(ctx context.Context, in *CollectionListRequest, opts ...grpc.CallOption) (*CollectionListResponse, error) CollectionDelete(ctx context.Context, in *CollectionDeleteRequest, opts ...grpc.CallOption) (*CollectionDeleteResponse, error) @@ -133,6 +134,37 @@ func (c *seaweedClient) Assign(ctx context.Context, in *AssignRequest, opts ...g return out, nil } +func (c *seaweedClient) StreamAssign(ctx context.Context, opts ...grpc.CallOption) (Seaweed_StreamAssignClient, error) { + stream, err := c.cc.NewStream(ctx, &Seaweed_ServiceDesc.Streams[2], "/master_pb.Seaweed/StreamAssign", opts...) + if err != nil { + return nil, err + } + x := &seaweedStreamAssignClient{stream} + return x, nil +} + +type Seaweed_StreamAssignClient interface { + Send(*AssignRequest) error + Recv() (*AssignResponse, error) + grpc.ClientStream +} + +type seaweedStreamAssignClient struct { + grpc.ClientStream +} + +func (x *seaweedStreamAssignClient) Send(m *AssignRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *seaweedStreamAssignClient) Recv() (*AssignResponse, error) { + m := new(AssignResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *seaweedClient) Statistics(ctx context.Context, in *StatisticsRequest, opts ...grpc.CallOption) (*StatisticsResponse, error) { out := new(StatisticsResponse) err := c.cc.Invoke(ctx, "/master_pb.Seaweed/Statistics", in, out, opts...) @@ -294,6 +326,7 @@ type SeaweedServer interface { KeepConnected(Seaweed_KeepConnectedServer) error LookupVolume(context.Context, *LookupVolumeRequest) (*LookupVolumeResponse, error) Assign(context.Context, *AssignRequest) (*AssignResponse, error) + StreamAssign(Seaweed_StreamAssignServer) error Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error) CollectionList(context.Context, *CollectionListRequest) (*CollectionListResponse, error) CollectionDelete(context.Context, *CollectionDeleteRequest) (*CollectionDeleteResponse, error) @@ -330,6 +363,9 @@ func (UnimplementedSeaweedServer) LookupVolume(context.Context, *LookupVolumeReq func (UnimplementedSeaweedServer) Assign(context.Context, *AssignRequest) (*AssignResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Assign not implemented") } +func (UnimplementedSeaweedServer) StreamAssign(Seaweed_StreamAssignServer) error { + return status.Errorf(codes.Unimplemented, "method StreamAssign not implemented") +} func (UnimplementedSeaweedServer) Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Statistics not implemented") } @@ -482,6 +518,32 @@ func _Seaweed_Assign_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Seaweed_StreamAssign_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SeaweedServer).StreamAssign(&seaweedStreamAssignServer{stream}) +} + +type Seaweed_StreamAssignServer interface { + Send(*AssignResponse) error + Recv() (*AssignRequest, error) + grpc.ServerStream +} + +type seaweedStreamAssignServer struct { + grpc.ServerStream +} + +func (x *seaweedStreamAssignServer) Send(m *AssignResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *seaweedStreamAssignServer) Recv() (*AssignRequest, error) { + m := new(AssignRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func _Seaweed_Statistics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(StatisticsRequest) if err := dec(in); err != nil { @@ -885,6 +947,12 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "StreamAssign", + Handler: _Seaweed_StreamAssign_Handler, + ServerStreams: true, + ClientStreams: true, + }, }, Metadata: "master.proto", } diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index 56d0dba24..a0aa79ae4 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -11,6 +11,7 @@ import ( type ServerAddress string type ServerAddresses string +type ServerSrvAddress string func NewServerAddress(host string, port int, grpcPort int) ServerAddress { if grpcPort == 0 || grpcPort == port+10000 { @@ -76,6 +77,42 @@ func (sa ServerAddress) ToGrpcAddress() string { return ServerToGrpcAddress(string(sa)) } +// LookUp may return an error for some records along with successful lookups - make sure you do not +// discard `addresses` even if `err == nil` +func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) { + _, records, lookupErr := net.LookupSRV("", "", string(r)) + if lookupErr != nil { + err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr) + } + for _, srv := range records { + address := fmt.Sprintf("%s:%d", srv.Target, srv.Port) + addresses = append(addresses, ServerAddress(address)) + } + return +} + +// ToServiceDiscovery expects one of: a comma-separated list of ip:port, like +// +// 10.0.0.1:9999,10.0.0.2:24:9999 +// +// OR an SRV Record prepended with 'dnssrv+', like: +// +// dnssrv+_grpc._tcp.master.consul +// dnssrv+_grpc._tcp.headless.default.svc.cluster.local +// dnssrv+seaweed-master.master.consul +func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) { + sd = &ServerDiscovery{} + prefix := "dnssrv+" + if strings.HasPrefix(string(sa), prefix) { + trimmed := strings.TrimPrefix(string(sa), prefix) + srv := ServerSrvAddress(trimmed) + sd.srvRecord = &srv + } else { + sd.list = sa.ToAddresses() + } + return +} + func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) { parts := strings.Split(string(sa), ",") for _, address := range parts { diff --git a/weed/pb/server_address_test.go b/weed/pb/server_address_test.go new file mode 100644 index 000000000..f5a12427a --- /dev/null +++ b/weed/pb/server_address_test.go @@ -0,0 +1,36 @@ +package pb + +import ( + "reflect" + "testing" +) + +func TestServerAddresses_ToAddressMapOrSrv_shouldRemovePrefix(t *testing.T) { + str := ServerAddresses("dnssrv+hello.srv.consul") + + d := str.ToServiceDiscovery() + + expected := ServerSrvAddress("hello.srv.consul") + if *d.srvRecord != expected { + t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected %s`, *d.srvRecord, expected) + } +} + +func TestServerAddresses_ToAddressMapOrSrv_shouldHandleIPPortList(t *testing.T) { + str := ServerAddresses("10.0.0.1:23,10.0.0.2:24") + + d := str.ToServiceDiscovery() + + if d.srvRecord != nil { + t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected nil`, *d.srvRecord) + } + + expected := []ServerAddress{ + ServerAddress("10.0.0.1:23"), + ServerAddress("10.0.0.2:24"), + } + + if !reflect.DeepEqual(d.list, expected) { + t.Fatalf(`Expected %q, got %q`, expected, d.list) + } +} diff --git a/weed/pb/server_discovery.go b/weed/pb/server_discovery.go new file mode 100644 index 000000000..25c0360c5 --- /dev/null +++ b/weed/pb/server_discovery.go @@ -0,0 +1,62 @@ +package pb + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "reflect" +) + +// ServerDiscovery encodes a way to find at least 1 instance of a service, +// and provides utility functions to refresh the instance list +type ServerDiscovery struct { + list []ServerAddress + srvRecord *ServerSrvAddress +} + +func NewServiceDiscoveryFromMap(m map[string]ServerAddress) (sd *ServerDiscovery) { + sd = &ServerDiscovery{} + for _, s := range m { + sd.list = append(sd.list, s) + } + return sd +} + +// RefreshBySrvIfAvailable performs a DNS SRV lookup and updates list with the results +// of the lookup +func (sd *ServerDiscovery) RefreshBySrvIfAvailable() { + if sd.srvRecord == nil { + return + } + newList, err := sd.srvRecord.LookUp() + if err != nil { + glog.V(0).Infof("failed to lookup SRV for %s: %v", *sd.srvRecord, err) + } + if newList == nil || len(newList) == 0 { + glog.V(0).Infof("looked up SRV for %s, but found no well-formed names", *sd.srvRecord) + return + } + if !reflect.DeepEqual(sd.list, newList) { + sd.list = newList + } +} + +// GetInstances returns a copy of the latest known list of addresses +// call RefreshBySrvIfAvailable prior to this in order to get a more up-to-date view +func (sd *ServerDiscovery) GetInstances() (addresses []ServerAddress) { + for _, a := range sd.list { + addresses = append(addresses, a) + } + return addresses +} +func (sd *ServerDiscovery) GetInstancesAsStrings() (addresses []string) { + for _, i := range sd.list { + addresses = append(addresses, string(i)) + } + return addresses +} +func (sd *ServerDiscovery) GetInstancesAsMap() (addresses map[string]ServerAddress) { + addresses = make(map[string]ServerAddress) + for _, i := range sd.list { + addresses[string(i)] = i + } + return addresses +} diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index 876acd7cf..234dc100b 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -2,12 +2,13 @@ package s3api import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3account" "net/http" "os" "strings" "sync" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3account" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -31,6 +32,9 @@ type IdentityAccessManagement struct { identities []*Identity isAuthEnabled bool domain string + hashes map[string]*sync.Pool + hashCounters map[string]*int32 + hashMu sync.RWMutex } type Identity struct { @@ -76,7 +80,9 @@ func (action Action) getPermission() Permission { func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManagement { iam := &IdentityAccessManagement{ - domain: option.DomainName, + domain: option.DomainName, + hashes: make(map[string]*sync.Pool), + hashCounters: make(map[string]*int32), } if option.Config != "" { if err := iam.loadS3ApiConfigurationFromFile(option.Config); err != nil { diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go index 02a6bd4e0..04548cc6f 100644 --- a/weed/s3api/auth_signature_v4.go +++ b/weed/s3api/auth_signature_v4.go @@ -23,6 +23,7 @@ import ( "crypto/sha256" "crypto/subtle" "encoding/hex" + "hash" "io" "net/http" "net/url" @@ -30,6 +31,8 @@ import ( "sort" "strconv" "strings" + "sync" + "sync/atomic" "time" "unicode/utf8" @@ -151,14 +154,14 @@ func (iam *IdentityAccessManagement) doesSignatureMatch(hashedPayload string, r // Get string to sign from canonical request. stringToSign := getStringToSign(canonicalRequest, t, signV4Values.Credential.getScope()) - // Get hmac signing key. - signingKey := getSigningKey(cred.SecretKey, + // Calculate signature. + newSignature := iam.getSignature( + cred.SecretKey, signV4Values.Credential.scope.date, signV4Values.Credential.scope.region, - signV4Values.Credential.scope.service) - - // Calculate signature. - newSignature := getSignature(signingKey, stringToSign) + signV4Values.Credential.scope.service, + stringToSign, + ) // Verify if signature match. if !compareSignatureV4(newSignature, signV4Values.Signature) { @@ -325,11 +328,14 @@ func (iam *IdentityAccessManagement) doesPolicySignatureV4Match(formValues http. return s3err.ErrInvalidAccessKeyID } - // Get signing key. - signingKey := getSigningKey(cred.SecretKey, credHeader.scope.date, credHeader.scope.region, credHeader.scope.service) - // Get signature. - newSignature := getSignature(signingKey, formValues.Get("Policy")) + newSignature := iam.getSignature( + cred.SecretKey, + credHeader.scope.date, + credHeader.scope.region, + credHeader.scope.service, + formValues.Get("Policy"), + ) // Verify signature. if !compareSignatureV4(newSignature, formValues.Get("X-Amz-Signature")) { @@ -442,14 +448,14 @@ func (iam *IdentityAccessManagement) doesPresignedSignatureMatch(hashedPayload s // Get string to sign from canonical request. presignedStringToSign := getStringToSign(presignedCanonicalReq, t, pSignValues.Credential.getScope()) - // Get hmac presigned signing key. - presignedSigningKey := getSigningKey(cred.SecretKey, + // Get new signature. + newSignature := iam.getSignature( + cred.SecretKey, pSignValues.Credential.scope.date, pSignValues.Credential.scope.region, - pSignValues.Credential.scope.service) - - // Get new signature. - newSignature := getSignature(presignedSigningKey, presignedStringToSign) + pSignValues.Credential.scope.service, + presignedStringToSign, + ) // Verify signature. if !compareSignatureV4(req.URL.Query().Get("X-Amz-Signature"), newSignature) { @@ -458,6 +464,69 @@ func (iam *IdentityAccessManagement) doesPresignedSignatureMatch(hashedPayload s return identity, s3err.ErrNone } +func (iam *IdentityAccessManagement) getSignature(secretKey string, t time.Time, region string, service string, stringToSign string) string { + pool := iam.getSignatureHashPool(secretKey, t, region, service) + h := pool.Get().(hash.Hash) + defer pool.Put(h) + + h.Reset() + h.Write([]byte(stringToSign)) + sig := hex.EncodeToString(h.Sum(nil)) + + return sig +} + +func (iam *IdentityAccessManagement) getSignatureHashPool(secretKey string, t time.Time, region string, service string) *sync.Pool { + // Build a caching key for the pool. + date := t.Format(yyyymmdd) + hashID := "AWS4" + secretKey + "/" + date + "/" + region + "/" + service + "/" + "aws4_request" + + // Try to find an existing pool and return it. + iam.hashMu.RLock() + pool, ok := iam.hashes[hashID] + iam.hashMu.RUnlock() + + if !ok { + iam.hashMu.Lock() + defer iam.hashMu.Unlock() + pool, ok = iam.hashes[hashID] + } + + if ok { + atomic.StoreInt32(iam.hashCounters[hashID], 1) + return pool + } + + // Create a pool that returns HMAC hashers for the requested parameters to avoid expensive re-initializing + // of new instances on every request. + iam.hashes[hashID] = &sync.Pool{ + New: func() any { + signingKey := getSigningKey(secretKey, date, region, service) + return hmac.New(sha256.New, signingKey) + }, + } + iam.hashCounters[hashID] = new(int32) + + // Clean up unused pools automatically after one hour of inactivity + ticker := time.NewTicker(time.Hour) + go func() { + for range ticker.C { + old := atomic.SwapInt32(iam.hashCounters[hashID], 0) + if old == 0 { + break + } + } + + ticker.Stop() + iam.hashMu.Lock() + delete(iam.hashes, hashID) + delete(iam.hashCounters, hashID) + iam.hashMu.Unlock() + }() + + return iam.hashes[hashID] +} + func contains(list []string, elem string) bool { for _, t := range list { if t == elem { @@ -674,19 +743,14 @@ func sumHMAC(key []byte, data []byte) []byte { } // getSigningKey hmac seed to calculate final signature. -func getSigningKey(secretKey string, t time.Time, region string, service string) []byte { - date := sumHMAC([]byte("AWS4"+secretKey), []byte(t.Format(yyyymmdd))) +func getSigningKey(secretKey string, time string, region string, service string) []byte { + date := sumHMAC([]byte("AWS4"+secretKey), []byte(time)) regionBytes := sumHMAC(date, []byte(region)) serviceBytes := sumHMAC(regionBytes, []byte(service)) signingKey := sumHMAC(serviceBytes, []byte("aws4_request")) return signingKey } -// getSignature final signature in hexadecimal form. -func getSignature(signingKey []byte, stringToSign string) string { - return hex.EncodeToString(sumHMAC(signingKey, []byte(stringToSign))) -} - // getCanonicalHeaders generate a list of request headers with their values func getCanonicalHeaders(signedHeaders http.Header) string { var headers []string diff --git a/weed/s3api/auto_signature_v4_test.go b/weed/s3api/auto_signature_v4_test.go index db8bfd8ef..8d0b677f8 100644 --- a/weed/s3api/auto_signature_v4_test.go +++ b/weed/s3api/auto_signature_v4_test.go @@ -14,6 +14,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" "unicode/utf8" @@ -114,7 +115,7 @@ func TestCheckAdminRequestAuthType(t *testing.T) { }{ {Request: mustNewRequest("GET", "http://127.0.0.1:9000", 0, nil, t), ErrCode: s3err.ErrAccessDenied}, {Request: mustNewSignedRequest("GET", "http://127.0.0.1:9000", 0, nil, t), ErrCode: s3err.ErrNone}, - {Request: mustNewPresignedRequest("GET", "http://127.0.0.1:9000", 0, nil, t), ErrCode: s3err.ErrNone}, + {Request: mustNewPresignedRequest(iam, "GET", "http://127.0.0.1:9000", 0, nil, t), ErrCode: s3err.ErrNone}, } for i, testCase := range testCases { if _, s3Error := iam.reqSignatureV4Verify(testCase.Request); s3Error != testCase.ErrCode { @@ -123,6 +124,20 @@ func TestCheckAdminRequestAuthType(t *testing.T) { } } +func BenchmarkGetSignature(b *testing.B) { + t := time.Now() + iam := IdentityAccessManagement{ + hashes: make(map[string]*sync.Pool), + hashCounters: make(map[string]*int32), + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + iam.getSignature("secret-key", t, "us-east-1", "s3", "random data") + } +} + // Provides a fully populated http request instance, fails otherwise. func mustNewRequest(method string, urlStr string, contentLength int64, body io.ReadSeeker, t *testing.T) *http.Request { req, err := newTestRequest(method, urlStr, contentLength, body) @@ -145,10 +160,10 @@ func mustNewSignedRequest(method string, urlStr string, contentLength int64, bod // This is similar to mustNewRequest but additionally the request // is presigned with AWS Signature V4, fails if not able to do so. -func mustNewPresignedRequest(method string, urlStr string, contentLength int64, body io.ReadSeeker, t *testing.T) *http.Request { +func mustNewPresignedRequest(iam *IdentityAccessManagement, method string, urlStr string, contentLength int64, body io.ReadSeeker, t *testing.T) *http.Request { req := mustNewRequest(method, urlStr, contentLength, body, t) cred := &Credential{"access_key_1", "secret_key_1"} - if err := preSignV4(req, cred.AccessKey, cred.SecretKey, int64(10*time.Minute.Seconds())); err != nil { + if err := preSignV4(iam, req, cred.AccessKey, cred.SecretKey, int64(10*time.Minute.Seconds())); err != nil { t.Fatalf("Unable to initialized new signed http request %s", err) } return req @@ -343,7 +358,7 @@ func signRequestV4(req *http.Request, accessKey, secretKey string) error { // preSignV4 presign the request, in accordance with // http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html. -func preSignV4(req *http.Request, accessKeyID, secretAccessKey string, expires int64) error { +func preSignV4(iam *IdentityAccessManagement, req *http.Request, accessKeyID, secretAccessKey string, expires int64) error { // Presign is not needed for anonymous credentials. if accessKeyID == "" || secretAccessKey == "" { return errors.New("Presign cannot be generated without access and secret keys") @@ -370,8 +385,7 @@ func preSignV4(req *http.Request, accessKeyID, secretAccessKey string, expires i queryStr := strings.Replace(query.Encode(), "+", "%20", -1) canonicalRequest := getCanonicalRequest(extractedSignedHeaders, unsignedPayload, queryStr, req.URL.Path, req.Method) stringToSign := getStringToSign(canonicalRequest, date, scope) - signingKey := getSigningKey(secretAccessKey, date, region, "s3") - signature := getSignature(signingKey, stringToSign) + signature := iam.getSignature(secretAccessKey, date, region, "s3", stringToSign) req.URL.RawQuery = query.Encode() diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go index 8ba1bc479..4bf74d025 100644 --- a/weed/s3api/chunked_reader_v4.go +++ b/weed/s3api/chunked_reader_v4.go @@ -24,36 +24,17 @@ import ( "crypto/sha256" "encoding/hex" "errors" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" - "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "hash" "io" "net/http" "time" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/dustin/go-humanize" ) -// getChunkSignature - get chunk signature. -func getChunkSignature(secretKey string, seedSignature string, region string, date time.Time, hashedChunk string) string { - - // Calculate string to sign. - stringToSign := signV4ChunkedAlgorithm + "\n" + - date.Format(iso8601Format) + "\n" + - getScope(date, region) + "\n" + - seedSignature + "\n" + - emptySHA256 + "\n" + - hashedChunk - - // Get hmac signing key. - signingKey := getSigningKey(secretKey, date, region, "s3") - - // Calculate signature. - newSignature := getSignature(signingKey, stringToSign) - - return newSignature -} - // calculateSeedSignature - Calculate seed signature in accordance with // - http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html // @@ -124,11 +105,14 @@ func (iam *IdentityAccessManagement) calculateSeedSignature(r *http.Request) (cr // Get string to sign from canonical request. stringToSign := getStringToSign(canonicalRequest, date, signV4Values.Credential.getScope()) - // Get hmac signing key. - signingKey := getSigningKey(cred.SecretKey, signV4Values.Credential.scope.date, region, "s3") - // Calculate signature. - newSignature := getSignature(signingKey, stringToSign) + newSignature := iam.getSignature( + cred.SecretKey, + signV4Values.Credential.scope.date, + region, + "s3", + stringToSign, + ) // Verify if signature match. if !compareSignatureV4(newSignature, signV4Values.Signature) { @@ -163,6 +147,7 @@ func (iam *IdentityAccessManagement) newSignV4ChunkedReader(req *http.Request) ( region: region, chunkSHA256Writer: sha256.New(), state: readChunkHeader, + iam: iam, }, s3err.ErrNone } @@ -180,6 +165,7 @@ type s3ChunkedReader struct { chunkSHA256Writer hash.Hash // Calculates sha256 of chunk data. n uint64 // Unread bytes in chunk err error + iam *IdentityAccessManagement } // Read chunk reads the chunk token signature portion. @@ -296,7 +282,7 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { // Calculate the hashed chunk. hashedChunk := hex.EncodeToString(cr.chunkSHA256Writer.Sum(nil)) // Calculate the chunk signature. - newSignature := getChunkSignature(cr.cred.SecretKey, cr.seedSignature, cr.region, cr.seedDate, hashedChunk) + newSignature := cr.getChunkSignature(hashedChunk) if !compareSignatureV4(cr.chunkSignature, newSignature) { // Chunk signature doesn't match we return signature does not match. cr.err = errors.New("chunk signature does not match") @@ -317,6 +303,26 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { } } +// getChunkSignature - get chunk signature. +func (cr *s3ChunkedReader) getChunkSignature(hashedChunk string) string { + // Calculate string to sign. + stringToSign := signV4ChunkedAlgorithm + "\n" + + cr.seedDate.Format(iso8601Format) + "\n" + + getScope(cr.seedDate, cr.region) + "\n" + + cr.seedSignature + "\n" + + emptySHA256 + "\n" + + hashedChunk + + // Calculate signature. + return cr.iam.getSignature( + cr.cred.SecretKey, + cr.seedDate, + cr.region, + "s3", + stringToSign, + ) +} + // readCRLF - check if reader only has '\r\n' CRLF character. // returns malformed encoding if it doesn't. func readCRLF(reader io.Reader) error { diff --git a/weed/security/jwt.go b/weed/security/jwt.go index 5d9534c7b..446c3c21d 100644 --- a/weed/security/jwt.go +++ b/weed/security/jwt.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/golang-jwt/jwt" + jwt "github.com/golang-jwt/jwt/v5" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -17,14 +17,14 @@ type SigningKey []byte // restricting the access this JWT allows to only a single file. type SeaweedFileIdClaims struct { Fid string `json:"fid"` - jwt.StandardClaims + jwt.RegisteredClaims } // SeaweedFilerClaims is created e.g. by S3 proxy server and consumed by Filer server. // Right now, it only contains the standard claims; but this might be extended later // for more fine-grained permissions. type SeaweedFilerClaims struct { - jwt.StandardClaims + jwt.RegisteredClaims } func GenJwtForVolumeServer(signingKey SigningKey, expiresAfterSec int, fileId string) EncodedJwt { @@ -34,10 +34,10 @@ func GenJwtForVolumeServer(signingKey SigningKey, expiresAfterSec int, fileId st claims := SeaweedFileIdClaims{ fileId, - jwt.StandardClaims{}, + jwt.RegisteredClaims{}, } if expiresAfterSec > 0 { - claims.ExpiresAt = time.Now().Add(time.Second * time.Duration(expiresAfterSec)).Unix() + claims.ExpiresAt = jwt.NewNumericDate(time.Now().Add(time.Second * time.Duration(expiresAfterSec))) } t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) encoded, e := t.SignedString([]byte(signingKey)) @@ -56,10 +56,10 @@ func GenJwtForFilerServer(signingKey SigningKey, expiresAfterSec int) EncodedJwt } claims := SeaweedFilerClaims{ - jwt.StandardClaims{}, + jwt.RegisteredClaims{}, } if expiresAfterSec > 0 { - claims.ExpiresAt = time.Now().Add(time.Second * time.Duration(expiresAfterSec)).Unix() + claims.ExpiresAt = jwt.NewNumericDate(time.Now().Add(time.Second * time.Duration(expiresAfterSec))) } t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) encoded, e := t.SignedString([]byte(signingKey)) diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 58215a927..8a58e287c 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -87,7 +87,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) t := &filer_pb.GetFilerConfigurationResponse{ - Masters: pb.ToAddressStringsFromMap(fs.option.Masters), + Masters: fs.option.Masters.GetInstancesAsStrings(), Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 8a6d341bb..98784bce3 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -50,7 +50,7 @@ import ( ) type FilerOption struct { - Masters map[string]pb.ServerAddress + Masters *pb.ServerDiscovery FilerGroup string Collection string DefaultReplication string @@ -94,6 +94,9 @@ type FilerServer struct { // track known metadata listeners knownListenersLock sync.Mutex knownListeners map[int32]int32 + + // client to assign file id + assignProxy *operation.AssignProxy } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -115,11 +118,12 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) } fs.listenersCond = sync.NewCond(&fs.listenersLock) - if len(option.Masters) == 0 { + option.Masters.RefreshBySrvIfAvailable() + if len(option.Masters.GetInstances()) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { + fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher @@ -131,6 +135,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepMasterClientConnected() + fs.assignProxy, err = operation.NewAssignProxy(fs.filer.GetMaster, fs.grpcDialOption, 16) + if !util.LoadConfiguration("filer", false) { v.SetDefault("leveldb2.enabled", true) v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) @@ -183,14 +189,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot) - return fs, nil + return fs, err } func (fs *FilerServer) checkWithMaster() { isConnected := false for !isConnected { - for _, master := range fs.option.Masters { + fs.option.Masters.RefreshBySrvIfAvailable() + for _, master := range fs.option.Masters.GetInstances() { readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 898975d14..daf63fa8d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -43,7 +43,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) + assignResult, ae := fs.assignProxy.Assign(ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) err = ae diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go new file mode 100644 index 000000000..34e85d752 --- /dev/null +++ b/weed/server/master_grpc_server_assign.go @@ -0,0 +1,122 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "time" + + "github.com/seaweedfs/raft" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +func (ms *MasterServer) StreamAssign(server master_pb.Seaweed_StreamAssignServer) error { + for { + req, err := server.Recv() + if err != nil { + glog.Errorf("StreamAssign failed to receive: %v", err) + return err + } + resp, err := ms.Assign(context.Background(), req) + if err != nil { + glog.Errorf("StreamAssign failed to assign: %v", err) + return err + } + if err = server.Send(resp); err != nil { + glog.Errorf("StreamAssign failed to send: %v", err) + return err + } + } +} +func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + if req.Count == 0 { + req.Count = 1 + } + + if req.Replication == "" { + req.Replication = ms.option.DefaultReplicaPlacement + } + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) + if err != nil { + return nil, err + } + ttl, err := needle.ReadTTL(req.Ttl) + if err != nil { + return nil, err + } + diskType := types.ToDiskType(req.DiskType) + + option := &topology.VolumeGrowOption{ + Collection: req.Collection, + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + DiskType: diskType, + Preallocate: ms.preallocateSize, + DataCenter: req.DataCenter, + Rack: req.Rack, + DataNode: req.DataNode, + MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, + } + + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { + if ms.Topo.AvailableSpaceFor(option) <= 0 { + return nil, fmt.Errorf("no free volumes left for " + option.String()) + } + vl.AddGrowRequest() + ms.vgCh <- &topology.VolumeGrowRequest{ + Option: option, + Count: int(req.WritableVolumeCount), + } + } + + var ( + lastErr error + maxTimeout = time.Second * 10 + startTime = time.Now() + ) + + for time.Now().Sub(startTime) < maxTimeout { + fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) + if err == nil { + dn := dnList.Head() + var replicas []*master_pb.Location + for _, r := range dnList.Rest() { + replicas = append(replicas, &master_pb.Location{ + Url: r.Url(), + PublicUrl: r.PublicUrl, + GrpcPort: uint32(r.GrpcPort), + DataCenter: r.GetDataCenterId(), + }) + } + return &master_pb.AssignResponse{ + Fid: fid, + Location: &master_pb.Location{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + GrpcPort: uint32(dn.GrpcPort), + DataCenter: dn.GetDataCenterId(), + }, + Count: count, + Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + Replicas: replicas, + }, nil + } + //glog.V(4).Infoln("waiting for volume growing...") + lastErr = err + time.Sleep(200 * time.Millisecond) + } + return nil, lastErr +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 87c7b9990..4fa6406a7 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -16,7 +16,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/topology" ) func (ms *MasterServer) ProcessGrowRequest() { @@ -113,93 +112,6 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV return resp, nil } -func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { - - if !ms.Topo.IsLeader() { - return nil, raft.NotLeaderError - } - - if req.Count == 0 { - req.Count = 1 - } - - if req.Replication == "" { - req.Replication = ms.option.DefaultReplicaPlacement - } - replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) - if err != nil { - return nil, err - } - ttl, err := needle.ReadTTL(req.Ttl) - if err != nil { - return nil, err - } - diskType := types.ToDiskType(req.DiskType) - - option := &topology.VolumeGrowOption{ - Collection: req.Collection, - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - DiskType: diskType, - Preallocate: ms.preallocateSize, - DataCenter: req.DataCenter, - Rack: req.Rack, - DataNode: req.DataNode, - MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, - } - - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - - if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { - if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for " + option.String()) - } - vl.AddGrowRequest() - ms.vgCh <- &topology.VolumeGrowRequest{ - Option: option, - Count: int(req.WritableVolumeCount), - } - } - - var ( - lastErr error - maxTimeout = time.Second * 10 - startTime = time.Now() - ) - - for time.Now().Sub(startTime) < maxTimeout { - fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) - if err == nil { - dn := dnList.Head() - var replicas []*master_pb.Location - for _, r := range dnList.Rest() { - replicas = append(replicas, &master_pb.Location{ - Url: r.Url(), - PublicUrl: r.PublicUrl, - GrpcPort: uint32(r.GrpcPort), - DataCenter: r.GetDataCenterId(), - }) - } - return &master_pb.AssignResponse{ - Fid: fid, - Location: &master_pb.Location{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, - GrpcPort: uint32(dn.GrpcPort), - DataCenter: dn.GetDataCenterId(), - }, - Count: count, - Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), - Replicas: replicas, - }, nil - } - //glog.V(4).Infoln("waiting for volume growing...") - lastErr = err - time.Sleep(200 * time.Millisecond) - } - return nil, lastErr -} - func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) { if !ms.Topo.IsLeader() { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 2489aaefd..9a5313a10 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -110,7 +110,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)), adminLocks: NewAdminLocks(), Cluster: cluster.NewCluster(), } diff --git a/weed/shell/commands.go b/weed/shell/commands.go index b1722edfb..e6e582376 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -51,7 +51,7 @@ var ( func NewCommandEnv(options *ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()), option: options, } ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell") diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index dda4d95e5..3dda42423 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -299,7 +299,6 @@ func JoinHostPort(host string, port int) string { return net.JoinHostPort(host, portStr) } - func StartMetricsServer(ip string, port int) { if port == 0 { return diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 7d68de2e6..a46643f57 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -79,7 +79,7 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, } glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size()) - err = encodeDatFile(fi.Size(), err, baseFileName, bufferSize, largeBlockSize, file, smallBlockSize) + err = encodeDatFile(fi.Size(), baseFileName, bufferSize, largeBlockSize, file, smallBlockSize) if err != nil { return fmt.Errorf("encodeDatFile: %v", err) } @@ -195,7 +195,7 @@ func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blo return nil } -func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error { +func encodeDatFile(remainingSize int64, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error { var processedSize int64 diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index e66a83200..aa46cf6d4 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -43,10 +43,10 @@ func TestEncodingDecoding(t *testing.T) { func validateFiles(baseFileName string) error { nm, err := readNeedleMap(baseFileName) - defer nm.Close() if err != nil { return fmt.Errorf("readNeedleMap: %v", err) } + defer nm.Close() datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0) if err != nil { @@ -60,6 +60,9 @@ func validateFiles(baseFileName string) error { } ecFiles, err := openEcFiles(baseFileName, true) + if err != nil { + return fmt.Errorf("error opening ec files: %w", err) + } defer closeEcFiles(ecFiles) err = nm.AscendingVisit(func(value needle_map.NeedleValue) error { diff --git a/weed/util/constants.go b/weed/util/constants.go index 7123cc3b7..f6f282bdc 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION_NUMBER = fmt.Sprintf("%.02f", 3.55) + VERSION_NUMBER = fmt.Sprintf("%.02f", 3.56) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" ) diff --git a/weed/util/http_util.go b/weed/util/http_util.go index bb1a32ede..ef4b29158 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -54,6 +54,9 @@ func Post(url string, values url.Values) ([]byte, error) { func Get(url string) ([]byte, bool, error) { request, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, true, err + } request.Header.Add("Accept-Encoding", "gzip") response, err := client.Do(request) @@ -66,6 +69,9 @@ func Get(url string) ([]byte, bool, error) { switch response.Header.Get("Content-Encoding") { case "gzip": reader, err = gzip.NewReader(response.Body) + if err != nil { + return nil, true, err + } defer reader.Close() default: reader = response.Body @@ -253,6 +259,9 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC switch contentEncoding { case "gzip": reader, err = gzip.NewReader(r.Body) + if err != nil { + return 0, err + } defer reader.Close() default: reader = r.Body @@ -400,6 +409,9 @@ func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*htt switch contentEncoding { case "gzip": reader, err = gzip.NewReader(r.Body) + if err != nil { + return nil, nil, err + } default: reader = r.Body } diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index c693df582..a6ddf22f3 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -24,8 +24,8 @@ type MasterClient struct { rack string currentMaster pb.ServerAddress currentMasterLock sync.RWMutex - masters map[string]pb.ServerAddress - grpcDialOption grpc.DialOption + masters pb.ServerDiscovery + grpcDialOption grpc.DialOption *vidMap vidMapCacheSize int @@ -33,7 +33,7 @@ type MasterClient struct { OnPeerUpdateLock sync.RWMutex } -func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient { +func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { return &MasterClient{ FilerGroup: filerGroup, clientType: clientType, @@ -108,9 +108,9 @@ func (mc *MasterClient) GetMaster() pb.ServerAddress { return mc.getCurrentMaster() } -func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress { +func (mc *MasterClient) GetMasters() []pb.ServerAddress { mc.WaitUntilConnected() - return mc.masters + return mc.masters.GetInstances() } func (mc *MasterClient) WaitUntilConnected() { @@ -132,7 +132,7 @@ func (mc *MasterClient) KeepConnectedToMaster() { } func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) { - for _, master := range mc.masters { + for _, master := range mc.masters.GetInstances() { if master == myMasterAddress { continue } @@ -159,7 +159,8 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres func (mc *MasterClient) tryAllMasters() { var nextHintedLeader pb.ServerAddress - for _, master := range mc.masters { + mc.masters.RefreshBySrvIfAvailable() + for _, master := range mc.masters.GetInstances() { nextHintedLeader = mc.tryConnectToMaster(master) for nextHintedLeader != "" { nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index 980e5bd8c..a734c6b0c 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -3,6 +3,7 @@ package wdclient import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "google.golang.org/grpc" "strconv" "sync" @@ -65,7 +66,7 @@ func TestLocationIndex(t *testing.T) { } func TestLookupFileId(t *testing.T) { - mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil) + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{}) length := 5 //Construct a cache linked list of length 5 @@ -135,7 +136,7 @@ func TestLookupFileId(t *testing.T) { } func TestConcurrentGetLocations(t *testing.T) { - mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil) + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{}) location := Location{Url: "TestDataRacing"} mc.addLocation(1, location) |
