diff options
Diffstat (limited to 'weed/wdclient')
| -rw-r--r-- | weed/wdclient/masterclient.go | 87 | ||||
| -rw-r--r-- | weed/wdclient/vid_map.go | 44 | ||||
| -rw-r--r-- | weed/wdclient/vid_map_test.go | 76 |
3 files changed, 162 insertions, 45 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index e16356e9d..111514f5e 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -51,54 +51,71 @@ func (mc *MasterClient) KeepConnectedToMaster() { } func (mc *MasterClient) tryAllMasters() { + nextHintedLeader := "" for _, master := range mc.masters { - glog.V(1).Infof("%s Connecting to master %v", mc.name, master) - gprcErr := withMasterClient(context.Background(), master, mc.grpcDialOption, func(ctx context.Context, client master_pb.SeaweedClient) error { - stream, err := client.KeepConnected(ctx) + nextHintedLeader = mc.tryConnectToMaster(master) + for nextHintedLeader != "" { + nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) + } + + mc.currentMaster = "" + mc.vidMap = newVidMap() + } +} + +func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) { + glog.V(1).Infof("%s Connecting to master %v", mc.name, master) + gprcErr := withMasterClient(context.Background(), master, mc.grpcDialOption, func(ctx context.Context, client master_pb.SeaweedClient) error { + + stream, err := client.KeepConnected(ctx) + if err != nil { + glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.name, master, err) + return err + } + + if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.name}); err != nil { + glog.V(0).Infof("%s failed to send to %s: %v", mc.name, master, err) + return err + } + + glog.V(1).Infof("%s Connected to %v", mc.name, master) + mc.currentMaster = master + + for { + volumeLocation, err := stream.Recv() if err != nil { - glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.name, master, err) + glog.V(0).Infof("%s failed to receive from %s: %v", mc.name, master, err) return err } - if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil { - glog.V(0).Infof("%s failed to send to %s: %v", mc.name, master, err) - return err + // maybe the leader is changed + if volumeLocation.Leader != "" { + glog.V(0).Infof("redirected to leader %v", volumeLocation.Leader) + nextHintedLeader = volumeLocation.Leader + return nil } - if mc.currentMaster == "" { - glog.V(1).Infof("%s Connected to %v", mc.name, master) - mc.currentMaster = master + // process new volume location + loc := Location{ + Url: volumeLocation.Url, + PublicUrl: volumeLocation.PublicUrl, } - - for { - if volumeLocation, err := stream.Recv(); err != nil { - glog.V(0).Infof("%s failed to receive from %s: %v", mc.name, master, err) - return err - } else { - loc := Location{ - Url: volumeLocation.Url, - PublicUrl: volumeLocation.PublicUrl, - } - for _, newVid := range volumeLocation.NewVids { - glog.V(1).Infof("%s: %s adds volume %d", mc.name, loc.Url, newVid) - mc.addLocation(newVid, loc) - } - for _, deletedVid := range volumeLocation.DeletedVids { - glog.V(1).Infof("%s: %s removes volume %d", mc.name, loc.Url, deletedVid) - mc.deleteLocation(deletedVid, loc) - } - } + for _, newVid := range volumeLocation.NewVids { + glog.V(1).Infof("%s: %s adds volume %d", mc.name, loc.Url, newVid) + mc.addLocation(newVid, loc) + } + for _, deletedVid := range volumeLocation.DeletedVids { + glog.V(1).Infof("%s: %s removes volume %d", mc.name, loc.Url, deletedVid) + mc.deleteLocation(deletedVid, loc) } - - }) - - if gprcErr != nil { - glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr) } - mc.currentMaster = "" + }) + if gprcErr != nil { + glog.V(0).Infof("%s failed to connect with master %v: %v", mc.name, master, gprcErr) } + return } func withMasterClient(ctx context.Context, master string, grpcDialOption grpc.DialOption, fn func(ctx context.Context, client master_pb.SeaweedClient) error) error { diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 06308944d..97df49cb6 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -3,15 +3,18 @@ package wdclient import ( "errors" "fmt" - "math/rand" "strconv" "strings" "sync" - "time" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/glog" ) +const ( + maxCursorIndex = 4096 +) + type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` @@ -20,16 +23,27 @@ type Location struct { type vidMap struct { sync.RWMutex vid2Locations map[uint32][]Location - r *rand.Rand + + cursor int32 } func newVidMap() vidMap { return vidMap{ vid2Locations: make(map[uint32][]Location), - r: rand.New(rand.NewSource(time.Now().UnixNano())), + cursor: -1, } } +func (vc *vidMap) getLocationIndex(length int) (int, error) { + if length <= 0 { + return 0, fmt.Errorf("invalid length: %d", length) + } + if atomic.LoadInt32(&vc.cursor) == maxCursorIndex { + atomic.CompareAndSwapInt32(&vc.cursor, maxCursorIndex, -1) + } + return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil +} + func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) { id, err := strconv.Atoi(vid) if err != nil { @@ -64,20 +78,25 @@ func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err er return serverUrl, nil } -func (vc *vidMap) GetVidLocations(vid string) (locations []Location) { +func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) { id, err := strconv.Atoi(vid) if err != nil { glog.V(1).Infof("Unknown volume id %s", vid) - return nil + return nil, fmt.Errorf("Unknown volume id %s", vid) + } + foundLocations, found := vc.GetLocations(uint32(id)) + if found { + return foundLocations, nil } - return vc.GetLocations(uint32(id)) + return nil, fmt.Errorf("volume id %s not found", vid) } -func (vc *vidMap) GetLocations(vid uint32) (locations []Location) { +func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { vc.RLock() defer vc.RUnlock() - return vc.vid2Locations[vid] + locations, found = vc.vid2Locations[vid] + return } func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) { @@ -89,7 +108,12 @@ func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) { return "", fmt.Errorf("volume %d not found", vid) } - return locations[vc.r.Intn(len(locations))].Url, nil + index, err := vc.getLocationIndex(len(locations)) + if err != nil { + return "", fmt.Errorf("volume %d: %v", vid, err) + } + + return locations[index].Url, nil } func (vc *vidMap) addLocation(vid uint32, location Location) { diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go new file mode 100644 index 000000000..87be2fc25 --- /dev/null +++ b/weed/wdclient/vid_map_test.go @@ -0,0 +1,76 @@ +package wdclient + +import ( + "fmt" + "testing" +) + +func TestLocationIndex(t *testing.T) { + vm := vidMap{} + // test must be failed + mustFailed := func(length int) { + _, err := vm.getLocationIndex(length) + if err == nil { + t.Errorf("length %d must be failed", length) + } + if err.Error() != fmt.Sprintf("invalid length: %d", length) { + t.Errorf("length %d must be failed. error: %v", length, err) + } + } + + mustFailed(-1) + mustFailed(0) + + mustOk := func(length, cursor, expect int) { + if length <= 0 { + t.Fatal("please don't do this") + } + vm.cursor = int32(cursor) + got, err := vm.getLocationIndex(length) + if err != nil { + t.Errorf("length: %d, why? %v\n", length, err) + return + } + if got != expect { + t.Errorf("cursor: %d, length: %d, expect: %d, got: %d\n", cursor, length, expect, got) + return + } + } + + for i := -1; i < 100; i++ { + mustOk(7, i, (i+1)%7) + } + + // when cursor reaches MaxInt64 + mustOk(7, maxCursorIndex, 0) + + // test with constructor + vm = newVidMap() + length := 7 + for i := 0; i < 100; i++ { + got, err := vm.getLocationIndex(length) + if err != nil { + t.Errorf("length: %d, why? %v\n", length, err) + return + } + if got != i%length { + t.Errorf("length: %d, i: %d, got: %d\n", length, i, got) + } + } +} + +func BenchmarkLocationIndex(b *testing.B) { + b.SetParallelism(8) + vm := vidMap{ + cursor: maxCursorIndex - 4000, + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := vm.getLocationIndex(3) + if err != nil { + b.Error(err) + } + } + }) +} |
