aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/assign_file_id.go
blob: 3f3bb13e0b74722ba825e2b19e16c2314b712042 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package operation

import (
	"bytes"
	"context"
	"fmt"

	"github.com/valyala/fasthttp"
	"google.golang.org/grpc"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
	"github.com/chrislusf/seaweedfs/weed/security"
	"github.com/chrislusf/seaweedfs/weed/util"
)

type VolumeAssignRequest struct {
	Count               uint64
	Replication         string
	Collection          string
	Ttl                 string
	DataCenter          string
	Rack                string
	DataNode            string
	WritableVolumeCount uint32
}

type AssignResult struct {
	Fid       string              `json:"fid,omitempty"`
	Url       string              `json:"url,omitempty"`
	PublicUrl string              `json:"publicUrl,omitempty"`
	Count     uint64              `json:"count,omitempty"`
	Error     string              `json:"error,omitempty"`
	Auth      security.EncodedJwt `json:"auth,omitempty"`
}

func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {

	var requests []*VolumeAssignRequest
	requests = append(requests, primaryRequest)
	requests = append(requests, alternativeRequests...)

	var lastError error
	ret := &AssignResult{}

	for i, request := range requests {
		if request == nil {
			continue
		}

		lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {

			req := &master_pb.AssignRequest{
				Count:               primaryRequest.Count,
				Replication:         primaryRequest.Replication,
				Collection:          primaryRequest.Collection,
				Ttl:                 primaryRequest.Ttl,
				DataCenter:          primaryRequest.DataCenter,
				Rack:                primaryRequest.Rack,
				DataNode:            primaryRequest.DataNode,
				WritableVolumeCount: primaryRequest.WritableVolumeCount,
			}
			resp, grpcErr := masterClient.Assign(context.Background(), req)
			if grpcErr != nil {
				return grpcErr
			}

			ret.Count = resp.Count
			ret.Fid = resp.Fid
			ret.Url = resp.Url
			ret.PublicUrl = resp.PublicUrl
			ret.Error = resp.Error
			ret.Auth = security.EncodedJwt(resp.Auth)

			return nil

		})

		if lastError != nil {
			continue
		}

		if ret.Count <= 0 {
			lastError = fmt.Errorf("assign failure %d: %v", i+1, ret.Error)
			continue
		}

	}

	return ret, lastError
}

func LookupJwt(master string, fileId string) security.EncodedJwt {

	tokenStr := ""
	lookupUrl := fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)

	err := util.Head(lookupUrl, func(header fasthttp.ResponseHeader) {
		bearer := header.Peek("Authorization")
		if len(bearer) > 7 && string(bytes.ToUpper(bearer[0:6])) == "BEARER" {
			tokenStr = string(bearer[7:])
		}
	})
	if err != nil {
		glog.V(0).Infof("failed to lookup jwt %s: %v", lookupUrl, err)
	}

	return security.EncodedJwt(tokenStr)
}