aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient')
-rw-r--r--weed/wdclient/masterclient.go95
-rw-r--r--weed/wdclient/vid_map.go99
-rw-r--r--weed/wdclient/wdclient.go15
3 files changed, 209 insertions, 0 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
new file mode 100644
index 000000000..347901f1a
--- /dev/null
+++ b/weed/wdclient/masterclient.go
@@ -0,0 +1,95 @@
+package wdclient
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MasterClient struct {
+ ctx context.Context
+ name string
+ currentMaster string
+ masters []string
+
+ vidMap
+}
+
+func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
+ return &MasterClient{
+ ctx: ctx,
+ name: clientName,
+ masters: masters,
+ vidMap: newVidMap(),
+ }
+}
+
+func (mc *MasterClient) GetMaster() string {
+ return mc.currentMaster
+}
+
+func (mc *MasterClient) KeepConnectedToMaster() {
+ glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
+ for {
+ mc.tryAllMasters()
+ time.Sleep(time.Second)
+ }
+}
+
+func (mc *MasterClient) tryAllMasters() {
+ for _, master := range mc.masters {
+ glog.V(0).Infof("Connecting to %v", master)
+ withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ stream, err := client.KeepConnected(context.Background())
+ if err != nil {
+ glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
+ return err
+ }
+
+ glog.V(0).Infof("Connected to %v", master)
+ mc.currentMaster = master
+
+ if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
+ glog.V(0).Infof("failed to send to %s: %v", master, err)
+ return err
+ }
+
+ for {
+ if volumeLocation, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("failed to receive from %s: %v", master, err)
+ return err
+ } else {
+ glog.V(0).Infof("volume location: %+v", volumeLocation)
+ loc := Location{
+ Url: volumeLocation.Url,
+ PublicUrl: volumeLocation.PublicUrl,
+ }
+ for _, newVid := range volumeLocation.NewVids {
+ mc.addLocation(newVid, loc)
+ }
+ for _, deletedVid := range volumeLocation.DeletedVids {
+ mc.deleteLocation(deletedVid, loc)
+ }
+ }
+ }
+ })
+ mc.currentMaster = ""
+ }
+}
+
+func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ grpcConnection, err := util.GrpcDial(master)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", master, err)
+ }
+ defer grpcConnection.Close()
+
+ client := master_pb.NewSeaweedClient(grpcConnection)
+
+ return fn(client)
+}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
new file mode 100644
index 000000000..2f56b8fce
--- /dev/null
+++ b/weed/wdclient/vid_map.go
@@ -0,0 +1,99 @@
+package wdclient
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "strings"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type Location struct {
+ Url string `json:"url,omitempty"`
+ PublicUrl string `json:"publicUrl,omitempty"`
+}
+
+type vidMap struct {
+ sync.RWMutex
+ vid2Locations map[uint32][]Location
+}
+
+func newVidMap() vidMap {
+ return vidMap{
+ vid2Locations: make(map[uint32][]Location),
+ }
+}
+
+func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
+ id, err := strconv.Atoi(vid)
+ if err != nil {
+ glog.V(1).Infof("Unknown volume id %s", vid)
+ return "", err
+ }
+
+ locations := vc.GetLocations(uint32(id))
+ if len(locations) == 0 {
+ return "", fmt.Errorf("volume %d not found", id)
+ }
+
+ return locations[rand.Intn(len(locations))].Url, nil
+}
+
+func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) {
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return "", errors.New("Invalid fileId " + fileId)
+ }
+ serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
+ if lookupError != nil {
+ return "", lookupError
+ }
+ return "http://" + serverUrl + "/" + fileId, nil
+}
+
+func (vc *vidMap) GetLocations(vid uint32) (locations []Location) {
+ vc.RLock()
+ defer vc.RUnlock()
+
+ return vc.vid2Locations[vid]
+}
+
+func (vc *vidMap) addLocation(vid uint32, location Location) {
+ vc.Lock()
+ defer vc.Unlock()
+
+ locations, found := vc.vid2Locations[vid]
+ if !found {
+ vc.vid2Locations[vid] = []Location{location}
+ return
+ }
+
+ for _, loc := range locations {
+ if loc.Url == location.Url {
+ return
+ }
+ }
+
+ vc.vid2Locations[vid] = append(locations, location)
+
+}
+
+func (vc *vidMap) deleteLocation(vid uint32, location Location) {
+ vc.Lock()
+ defer vc.Unlock()
+
+ locations, found := vc.vid2Locations[vid]
+ if !found {
+ return
+ }
+
+ for i, loc := range locations {
+ if loc.Url == location.Url {
+ vc.vid2Locations[vid] = append(locations[0:i], locations[i+1:]...)
+ }
+ }
+
+}
diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
new file mode 100644
index 000000000..722f4d061
--- /dev/null
+++ b/weed/wdclient/wdclient.go
@@ -0,0 +1,15 @@
+package wdclient
+
+import (
+ "context"
+)
+
+type SeaweedClient struct {
+ *MasterClient
+}
+
+func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient {
+ return &SeaweedClient{
+ MasterClient: NewMasterClient(ctx, clientName, masters),
+ }
+}