diff options
| author | chrislu <chris.lu@gmail.com> | 2023-08-23 00:31:33 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-08-23 00:31:33 -0700 |
| commit | 99f037b958b5952628d281461d3bfb76fa433d8c (patch) | |
| tree | 7b82cb004ef223e7c3a63a1aa8a3172c6ac060f8 /weed/operation/assign_file_id.go | |
| parent | eac92c334a7b7222b5289cb04733ec87abf36075 (diff) | |
| download | seaweedfs-99f037b958b5952628d281461d3bfb76fa433d8c.tar.xz seaweedfs-99f037b958b5952628d281461d3bfb76fa433d8c.zip | |
streaming assign file ids
Diffstat (limited to 'weed/operation/assign_file_id.go')
| -rw-r--r-- | weed/operation/assign_file_id.go | 105 |
1 files changed, 102 insertions, 3 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index c2f5a806d..a4753d234 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -4,11 +4,10 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" ) type VolumeAssignRequest struct { @@ -34,6 +33,106 @@ type AssignResult struct { Replicas []Location `json:"replicas,omitempty"` } +// This is a proxy to the master server, only for assigning volume ids. +// It runs via grpc to the master server in streaming mode. +// The connection to the master would only be re-established when the last connection has error. +type AssignProxy struct { + grpcConnection *grpc.ClientConn + pool chan *singleThreadAssignProxy +} + +func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) { + ap = &AssignProxy{ + pool: make(chan *singleThreadAssignProxy, concurrency), + } + ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption) + if err != nil { + return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err) + } + for i := 0; i < concurrency; i++ { + ap.pool <- &singleThreadAssignProxy{} + } + return ap, nil +} + +func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { + p := <-ap.pool + defer func() { + ap.pool <- p + }() + + return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...) +} + +type singleThreadAssignProxy struct { + assignClient master_pb.Seaweed_StreamAssignClient +} + +func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) { + if ap.assignClient == nil { + client := master_pb.NewSeaweedClient(grpcConnection) + ap.assignClient, err = client.StreamAssign(context.Background()) + if err != nil { + ap.assignClient = nil + return nil, fmt.Errorf("fail to create stream assign client: %v", err) + } + } + + var requests []*VolumeAssignRequest + requests = append(requests, primaryRequest) + requests = append(requests, alternativeRequests...) + ret = &AssignResult{} + + for _, request := range requests { + if request == nil { + continue + } + req := &master_pb.AssignRequest{ + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DiskType: request.DiskType, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, + } + if err = ap.assignClient.Send(req); err != nil { + return nil, fmt.Errorf("StreamAssignSend: %v", err) + } + resp, grpcErr := ap.assignClient.Recv() + if grpcErr != nil { + return nil, grpcErr + } + if resp.Error != "" { + return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error) + } + + ret.Count = resp.Count + ret.Fid = resp.Fid + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) + ret.Error = resp.Error + ret.Auth = security.EncodedJwt(resp.Auth) + for _, r := range resp.Replicas { + ret.Replicas = append(ret.Replicas, Location{ + Url: r.Url, + PublicUrl: r.PublicUrl, + DataCenter: r.DataCenter, + }) + } + + if ret.Count <= 0 { + continue + } + break + } + + return +} + func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest |
