diff options
Diffstat (limited to 'weed/server/master_grpc_server_assign.go')
| -rw-r--r-- | weed/server/master_grpc_server_assign.go | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go new file mode 100644 index 000000000..34e85d752 --- /dev/null +++ b/weed/server/master_grpc_server_assign.go @@ -0,0 +1,122 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "time" + + "github.com/seaweedfs/raft" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +func (ms *MasterServer) StreamAssign(server master_pb.Seaweed_StreamAssignServer) error { + for { + req, err := server.Recv() + if err != nil { + glog.Errorf("StreamAssign failed to receive: %v", err) + return err + } + resp, err := ms.Assign(context.Background(), req) + if err != nil { + glog.Errorf("StreamAssign failed to assign: %v", err) + return err + } + if err = server.Send(resp); err != nil { + glog.Errorf("StreamAssign failed to send: %v", err) + return err + } + } +} +func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + if req.Count == 0 { + req.Count = 1 + } + + if req.Replication == "" { + req.Replication = ms.option.DefaultReplicaPlacement + } + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) + if err != nil { + return nil, err + } + ttl, err := needle.ReadTTL(req.Ttl) + if err != nil { + return nil, err + } + diskType := types.ToDiskType(req.DiskType) + + option := &topology.VolumeGrowOption{ + Collection: req.Collection, + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + DiskType: diskType, + Preallocate: ms.preallocateSize, + DataCenter: req.DataCenter, + Rack: req.Rack, + DataNode: req.DataNode, + MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, + } + + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { + if ms.Topo.AvailableSpaceFor(option) <= 0 { + return nil, fmt.Errorf("no free volumes left for " + option.String()) + } + vl.AddGrowRequest() + ms.vgCh <- &topology.VolumeGrowRequest{ + Option: option, + Count: int(req.WritableVolumeCount), + } + } + + var ( + lastErr error + maxTimeout = time.Second * 10 + startTime = time.Now() + ) + + for time.Now().Sub(startTime) < maxTimeout { + fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) + if err == nil { + dn := dnList.Head() + var replicas []*master_pb.Location + for _, r := range dnList.Rest() { + replicas = append(replicas, &master_pb.Location{ + Url: r.Url(), + PublicUrl: r.PublicUrl, + GrpcPort: uint32(r.GrpcPort), + DataCenter: r.GetDataCenterId(), + }) + } + return &master_pb.AssignResponse{ + Fid: fid, + Location: &master_pb.Location{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + GrpcPort: uint32(dn.GrpcPort), + DataCenter: dn.GetDataCenterId(), + }, + Count: count, + Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + Replicas: replicas, + }, nil + } + //glog.V(4).Infoln("waiting for volume growing...") + lastErr = err + time.Sleep(200 * time.Millisecond) + } + return nil, lastErr +} |
