aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/masterclient.go
blob: f197fa6f2bb0505150d0522c3ebe6484b027428b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package wdclient

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
	"github.com/chrislusf/seaweedfs/weed/util"
	"google.golang.org/grpc"
)

type MasterClient struct {
	name           string
	currentMaster  string
	masters        []string
	grpcDialOption grpc.DialOption

	vidMap
}

func NewMasterClient(grpcDialOption grpc.DialOption, clientName string, masters []string) *MasterClient {
	return &MasterClient{
		name:           clientName,
		masters:        masters,
		grpcDialOption: grpcDialOption,
		vidMap:         newVidMap(),
	}
}

func (mc *MasterClient) GetMaster() string {
	return mc.currentMaster
}

func (mc *MasterClient) WaitUntilConnected() {
	for mc.currentMaster == "" {
		time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
	}
}

func (mc *MasterClient) KeepConnectedToMaster() {
	glog.V(1).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
	for {
		mc.tryAllMasters()
		time.Sleep(time.Second)
	}
}

func (mc *MasterClient) tryAllMasters() {
	nextHintedLeader := ""
	for _, master := range mc.masters {

		nextHintedLeader = mc.tryConnectToMaster(master)
		for nextHintedLeader != "" {
			nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
		}

		mc.currentMaster = ""
		mc.vidMap = newVidMap()
	}
}

func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
	glog.V(1).Infof("%s Connecting to master %v", mc.name, master)
	gprcErr := withMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {

		stream, err := client.KeepConnected(context.Background())
		if err != nil {
			glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.name, master, err)
			return err
		}

		if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.name}); err != nil {
			glog.V(0).Infof("%s failed to send to %s: %v", mc.name, master, err)
			return err
		}

		glog.V(1).Infof("%s Connected to %v", mc.name, master)
		mc.currentMaster = master

		for {
			volumeLocation, err := stream.Recv()
			if err != nil {
				glog.V(0).Infof("%s failed to receive from %s: %v", mc.name, master, err)
				return err
			}

			// maybe the leader is changed
			if volumeLocation.Leader != "" {
				glog.V(0).Infof("redirected to leader %v", volumeLocation.Leader)
				nextHintedLeader = volumeLocation.Leader
				return nil
			}

			// process new volume location
			loc := Location{
				Url:       volumeLocation.Url,
				PublicUrl: volumeLocation.PublicUrl,
			}
			for _, newVid := range volumeLocation.NewVids {
				glog.V(1).Infof("%s: %s adds volume %d", mc.name, loc.Url, newVid)
				mc.addLocation(newVid, loc)
			}
			for _, deletedVid := range volumeLocation.DeletedVids {
				glog.V(1).Infof("%s: %s removes volume %d", mc.name, loc.Url, deletedVid)
				mc.deleteLocation(deletedVid, loc)
			}
		}

	})
	if gprcErr != nil {
		glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr)
	}
	return
}

func withMasterClient(master string, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error {

	masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
	if parseErr != nil {
		return fmt.Errorf("failed to parse master grpc %v: %v", master, parseErr)
	}

	return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
		client := master_pb.NewSeaweedClient(grpcConnection)
		return fn(client)
	}, masterGrpcAddress, grpcDialOption)

}

func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
	return withMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
		return fn(client)
	})
}