1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
package wdclient
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
)
type MasterClient struct {
ctx context.Context
name string
currentMaster string
masters []string
grpcDialOption grpc.DialOption
vidMap
}
func NewMasterClient(ctx context.Context, grpcDialOption grpc.DialOption, clientName string, masters []string) *MasterClient {
return &MasterClient{
ctx: ctx,
name: clientName,
masters: masters,
grpcDialOption: grpcDialOption,
vidMap: newVidMap(),
}
}
func (mc *MasterClient) GetMaster() string {
return mc.currentMaster
}
func (mc *MasterClient) WaitUntilConnected() {
for mc.currentMaster == "" {
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
}
}
func (mc *MasterClient) KeepConnectedToMaster() {
glog.V(1).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
for {
mc.tryAllMasters()
time.Sleep(time.Second)
}
}
func (mc *MasterClient) tryAllMasters() {
nextHintedLeader := ""
for _, master := range mc.masters {
nextHintedLeader = mc.tryConnectToMaster(master)
for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
}
mc.currentMaster = ""
mc.vidMap = newVidMap()
}
}
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
glog.V(1).Infof("%s Connecting to master %v", mc.name, master)
gprcErr := withMasterClient(context.Background(), master, mc.grpcDialOption, func(ctx context.Context, client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(ctx)
if err != nil {
glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.name, master, err)
return err
}
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.name}); err != nil {
glog.V(0).Infof("%s failed to send to %s: %v", mc.name, master, err)
return err
}
glog.V(1).Infof("%s Connected to %v", mc.name, master)
mc.currentMaster = master
for {
volumeLocation, err := stream.Recv()
if err != nil {
glog.V(0).Infof("%s failed to receive from %s: %v", mc.name, master, err)
return err
}
// maybe the leader is changed
if volumeLocation.Leader != "" {
glog.V(0).Infof("redirected to leader %v", volumeLocation.Leader)
nextHintedLeader = volumeLocation.Leader
return nil
}
// process new volume location
loc := Location{
Url: volumeLocation.Url,
PublicUrl: volumeLocation.PublicUrl,
}
for _, newVid := range volumeLocation.NewVids {
glog.V(1).Infof("%s: %s adds volume %d", mc.name, loc.Url, newVid)
mc.addLocation(newVid, loc)
}
for _, deletedVid := range volumeLocation.DeletedVids {
glog.V(1).Infof("%s: %s removes volume %d", mc.name, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
}
})
if gprcErr != nil {
glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr)
}
return
}
func withMasterClient(ctx context.Context, master string, grpcDialOption grpc.DialOption, fn func(ctx context.Context, client master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
if parseErr != nil {
return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
}
return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(ctx, client)
}, masterGrpcAddress, grpcDialOption)
}
func (mc *MasterClient) WithClient(ctx context.Context, fn func(client master_pb.SeaweedClient) error) error {
return withMasterClient(ctx, mc.currentMaster, mc.grpcDialOption, func(ctx context.Context, client master_pb.SeaweedClient) error {
return fn(client)
})
}
|