diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2018-07-28 21:03:29 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-07-28 21:03:29 -0700 |
| commit | 452bd0b01393e53e958fb9825bf1f27e6b3522df (patch) | |
| tree | e1a61e592118f9696b7f51501d3b3fd0f6c3eeb5 /weed/wdclient | |
| parent | 97603d6e176dd2b9f2aebd9f6122a8c60481463a (diff) | |
| parent | d3205a007071f26587affb416f71b5c63854b863 (diff) | |
| download | seaweedfs-452bd0b01393e53e958fb9825bf1f27e6b3522df.tar.xz seaweedfs-452bd0b01393e53e958fb9825bf1f27e6b3522df.zip | |
Merge pull request #702 from chrislusf/add_topo_listener
Add volume id location change listener
Diffstat (limited to 'weed/wdclient')
| -rw-r--r-- | weed/wdclient/masterclient.go | 95 | ||||
| -rw-r--r-- | weed/wdclient/vid_map.go | 99 | ||||
| -rw-r--r-- | weed/wdclient/wdclient.go | 15 |
3 files changed, 209 insertions, 0 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go new file mode 100644 index 000000000..347901f1a --- /dev/null +++ b/weed/wdclient/masterclient.go @@ -0,0 +1,95 @@ +package wdclient + +import ( + "context" + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MasterClient struct { + ctx context.Context + name string + currentMaster string + masters []string + + vidMap +} + +func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient { + return &MasterClient{ + ctx: ctx, + name: clientName, + masters: masters, + vidMap: newVidMap(), + } +} + +func (mc *MasterClient) GetMaster() string { + return mc.currentMaster +} + +func (mc *MasterClient) KeepConnectedToMaster() { + glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters) + for { + mc.tryAllMasters() + time.Sleep(time.Second) + } +} + +func (mc *MasterClient) tryAllMasters() { + for _, master := range mc.masters { + glog.V(0).Infof("Connecting to %v", master) + withMasterClient(master, func(client master_pb.SeaweedClient) error { + stream, err := client.KeepConnected(context.Background()) + if err != nil { + glog.V(0).Infof("failed to keep connected to %s: %v", master, err) + return err + } + + glog.V(0).Infof("Connected to %v", master) + mc.currentMaster = master + + if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil { + glog.V(0).Infof("failed to send to %s: %v", master, err) + return err + } + + for { + if volumeLocation, err := stream.Recv(); err != nil { + glog.V(0).Infof("failed to receive from %s: %v", master, err) + return err + } else { + glog.V(0).Infof("volume location: %+v", volumeLocation) + loc := Location{ + Url: volumeLocation.Url, + PublicUrl: volumeLocation.PublicUrl, + } + for _, newVid := range volumeLocation.NewVids { + mc.addLocation(newVid, loc) + } + for _, deletedVid := range volumeLocation.DeletedVids { + mc.deleteLocation(deletedVid, loc) + } + } + } + }) + mc.currentMaster = "" + } +} + +func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { + + grpcConnection, err := util.GrpcDial(master) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", master, err) + } + defer grpcConnection.Close() + + client := master_pb.NewSeaweedClient(grpcConnection) + + return fn(client) +} diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go new file mode 100644 index 000000000..2f56b8fce --- /dev/null +++ b/weed/wdclient/vid_map.go @@ -0,0 +1,99 @@ +package wdclient + +import ( + "errors" + "fmt" + "math/rand" + "strconv" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type Location struct { + Url string `json:"url,omitempty"` + PublicUrl string `json:"publicUrl,omitempty"` +} + +type vidMap struct { + sync.RWMutex + vid2Locations map[uint32][]Location +} + +func newVidMap() vidMap { + return vidMap{ + vid2Locations: make(map[uint32][]Location), + } +} + +func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { + id, err := strconv.Atoi(vid) + if err != nil { + glog.V(1).Infof("Unknown volume id %s", vid) + return "", err + } + + locations := vc.GetLocations(uint32(id)) + if len(locations) == 0 { + return "", fmt.Errorf("volume %d not found", id) + } + + return locations[rand.Intn(len(locations))].Url, nil +} + +func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return "", errors.New("Invalid fileId " + fileId) + } + serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0]) + if lookupError != nil { + return "", lookupError + } + return "http://" + serverUrl + "/" + fileId, nil +} + +func (vc *vidMap) GetLocations(vid uint32) (locations []Location) { + vc.RLock() + defer vc.RUnlock() + + return vc.vid2Locations[vid] +} + +func (vc *vidMap) addLocation(vid uint32, location Location) { + vc.Lock() + defer vc.Unlock() + + locations, found := vc.vid2Locations[vid] + if !found { + vc.vid2Locations[vid] = []Location{location} + return + } + + for _, loc := range locations { + if loc.Url == location.Url { + return + } + } + + vc.vid2Locations[vid] = append(locations, location) + +} + +func (vc *vidMap) deleteLocation(vid uint32, location Location) { + vc.Lock() + defer vc.Unlock() + + locations, found := vc.vid2Locations[vid] + if !found { + return + } + + for i, loc := range locations { + if loc.Url == location.Url { + vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...) + } + } + +} diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go new file mode 100644 index 000000000..722f4d061 --- /dev/null +++ b/weed/wdclient/wdclient.go @@ -0,0 +1,15 @@ +package wdclient + +import ( + "context" +) + +type SeaweedClient struct { + *MasterClient +} + +func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient { + return &SeaweedClient{ + MasterClient: NewMasterClient(ctx, clientName, masters), + } +} |
