aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/assign_file_id.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-08-23 00:31:33 -0700
committerchrislu <chris.lu@gmail.com>2023-08-23 00:31:33 -0700
commit99f037b958b5952628d281461d3bfb76fa433d8c (patch)
tree7b82cb004ef223e7c3a63a1aa8a3172c6ac060f8 /weed/operation/assign_file_id.go
parenteac92c334a7b7222b5289cb04733ec87abf36075 (diff)
downloadseaweedfs-99f037b958b5952628d281461d3bfb76fa433d8c.tar.xz
seaweedfs-99f037b958b5952628d281461d3bfb76fa433d8c.zip
streaming assign file ids
Diffstat (limited to 'weed/operation/assign_file_id.go')
-rw-r--r--weed/operation/assign_file_id.go105
1 files changed, 102 insertions, 3 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index c2f5a806d..a4753d234 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -4,11 +4,10 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "google.golang.org/grpc"
-
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
)
type VolumeAssignRequest struct {
@@ -34,6 +33,106 @@ type AssignResult struct {
Replicas []Location `json:"replicas,omitempty"`
}
+// This is a proxy to the master server, only for assigning volume ids.
+// It runs via grpc to the master server in streaming mode.
+// The connection to the master would only be re-established when the last connection has error.
+type AssignProxy struct {
+ grpcConnection *grpc.ClientConn
+ pool chan *singleThreadAssignProxy
+}
+
+func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concurrency int) (ap *AssignProxy, err error) {
+ ap = &AssignProxy{
+ pool: make(chan *singleThreadAssignProxy, concurrency),
+ }
+ ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
+ if err != nil {
+ return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
+ }
+ for i := 0; i < concurrency; i++ {
+ ap.pool <- &singleThreadAssignProxy{}
+ }
+ return ap, nil
+}
+
+func (ap *AssignProxy) Assign(primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
+ p := <-ap.pool
+ defer func() {
+ ap.pool <- p
+ }()
+
+ return p.doAssign(ap.grpcConnection, primaryRequest, alternativeRequests...)
+}
+
+type singleThreadAssignProxy struct {
+ assignClient master_pb.Seaweed_StreamAssignClient
+}
+
+func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (ret *AssignResult, err error) {
+ if ap.assignClient == nil {
+ client := master_pb.NewSeaweedClient(grpcConnection)
+ ap.assignClient, err = client.StreamAssign(context.Background())
+ if err != nil {
+ ap.assignClient = nil
+ return nil, fmt.Errorf("fail to create stream assign client: %v", err)
+ }
+ }
+
+ var requests []*VolumeAssignRequest
+ requests = append(requests, primaryRequest)
+ requests = append(requests, alternativeRequests...)
+ ret = &AssignResult{}
+
+ for _, request := range requests {
+ if request == nil {
+ continue
+ }
+ req := &master_pb.AssignRequest{
+ Count: request.Count,
+ Replication: request.Replication,
+ Collection: request.Collection,
+ Ttl: request.Ttl,
+ DiskType: request.DiskType,
+ DataCenter: request.DataCenter,
+ Rack: request.Rack,
+ DataNode: request.DataNode,
+ WritableVolumeCount: request.WritableVolumeCount,
+ }
+ if err = ap.assignClient.Send(req); err != nil {
+ return nil, fmt.Errorf("StreamAssignSend: %v", err)
+ }
+ resp, grpcErr := ap.assignClient.Recv()
+ if grpcErr != nil {
+ return nil, grpcErr
+ }
+ if resp.Error != "" {
+ return nil, fmt.Errorf("StreamAssignRecv: %v", resp.Error)
+ }
+
+ ret.Count = resp.Count
+ ret.Fid = resp.Fid
+ ret.Url = resp.Location.Url
+ ret.PublicUrl = resp.Location.PublicUrl
+ ret.GrpcPort = int(resp.Location.GrpcPort)
+ ret.Error = resp.Error
+ ret.Auth = security.EncodedJwt(resp.Auth)
+ for _, r := range resp.Replicas {
+ ret.Replicas = append(ret.Replicas, Location{
+ Url: r.Url,
+ PublicUrl: r.PublicUrl,
+ DataCenter: r.DataCenter,
+ })
+ }
+
+ if ret.Count <= 0 {
+ continue
+ }
+ break
+ }
+
+ return
+}
+
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest