aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2022-07-22 02:57:50 -0700
committerGitHub <noreply@github.com>2022-07-22 02:57:50 -0700
commit0716092b396941e07ab2e41e782641ae658b3793 (patch)
treee13bb6cae85e8ba2454658d27698b6577800abab
parent8d97add89cddb316466776d875e9dc1419932ae6 (diff)
parent994a2dec78bcc0beaf9d643a96b98c9d17bd22b5 (diff)
downloadseaweedfs-0716092b396941e07ab2e41e782641ae658b3793.tar.xz
seaweedfs-0716092b396941e07ab2e41e782641ae658b3793.zip
Merge pull request #3350 from shichanglin5/optimize_masterclient_vidmap
Solve the problem that `LookupFileId` lookup urls is empty due to lea…
-rw-r--r--weed/wdclient/masterclient.go34
-rw-r--r--weed/wdclient/vid_map.go29
-rw-r--r--weed/wdclient/vid_map_test.go73
3 files changed, 123 insertions, 13 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index d6a06bb57..2d0b00b76 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -24,18 +24,20 @@ type MasterClient struct {
grpcDialOption grpc.DialOption
vidMap
+ vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient {
return &MasterClient{
- FilerGroup: filerGroup,
- clientType: clientType,
- clientHost: clientHost,
- masters: masters,
- grpcDialOption: grpcDialOption,
- vidMap: newVidMap(clientDataCenter),
+ FilerGroup: filerGroup,
+ clientType: clientType,
+ clientHost: clientHost,
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ vidMap: newVidMap(clientDataCenter),
+ vidMapCacheSize: 5,
}
}
@@ -175,10 +177,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
return nil
}
- mc.vidMap = newVidMap("")
+ //mc.vidMap = newVidMap("")
+ mc.resetVidMap()
mc.updateVidMap(resp)
} else {
- mc.vidMap = newVidMap("")
+ mc.resetVidMap()
+ //mc.vidMap = newVidMap("")
}
mc.currentMaster = master
@@ -263,3 +267,17 @@ func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.
})
})
}
+
+func (mc *MasterClient) resetVidMap() {
+ tail := &vidMap{vid2Locations: mc.vid2Locations, ecVid2Locations: mc.ecVid2Locations, cache: mc.cache}
+ mc.vidMap = newVidMap("")
+ mc.vidMap.cache = tail
+
+ for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ {
+ if i == mc.vidMapCacheSize-1 {
+ tail.cache = nil
+ } else {
+ tail = tail.cache
+ }
+ }
+}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 754c77051..5771c112a 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -40,6 +40,7 @@ type vidMap struct {
ecVid2Locations map[uint32][]Location
DataCenter string
cursor int32
+ cache *vidMap
}
func newVidMap(dataCenter string) vidMap {
@@ -119,17 +120,29 @@ func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error)
}
func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
+ glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
+ locations, found = vc.getLocations(vid)
+ if found && len(locations) > 0 {
+ return locations, found
+ }
+
+ if vc.cache != nil {
+ return vc.cache.GetLocations(vid)
+ }
+
+ return nil, false
+}
+
+func (vc *vidMap) getLocations(vid uint32) (locations []Location, found bool) {
vc.RLock()
defer vc.RUnlock()
- glog.V(4).Infof("~ lookup volume id %d: %+v ec:%+v", vid, vc.vid2Locations, vc.ecVid2Locations)
-
locations, found = vc.vid2Locations[vid]
if found && len(locations) > 0 {
return
}
locations, found = vc.ecVid2Locations[vid]
- return locations, found && len(locations) > 0
+ return
}
func (vc *vidMap) addLocation(vid uint32, location Location) {
@@ -177,6 +190,10 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
}
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
+ if vc.cache != nil {
+ vc.cache.deleteLocation(vid, location)
+ }
+
vc.Lock()
defer vc.Unlock()
@@ -193,10 +210,13 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
break
}
}
-
}
func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
+ if vc.cache != nil {
+ vc.cache.deleteLocation(vid, location)
+ }
+
vc.Lock()
defer vc.Unlock()
@@ -213,5 +233,4 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
break
}
}
-
}
diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go
index 0cea698ac..b1cd24490 100644
--- a/weed/wdclient/vid_map_test.go
+++ b/weed/wdclient/vid_map_test.go
@@ -2,6 +2,9 @@ package wdclient
import (
"fmt"
+ "google.golang.org/grpc"
+ "strconv"
+ "sync"
"testing"
)
@@ -59,6 +62,76 @@ func TestLocationIndex(t *testing.T) {
}
}
+func TestLookupFileId(t *testing.T) {
+ mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", nil)
+ length := 5
+
+ //Construct a cache linked list of length 5
+ for i := 0; i < length; i++ {
+ mc.addLocation(uint32(i), Location{Url: strconv.FormatInt(int64(i), 10)})
+ mc.resetVidMap()
+ }
+ for i := 0; i < length; i++ {
+ locations, found := mc.GetLocations(uint32(i))
+ if !found || len(locations) != 1 || locations[0].Url != strconv.FormatInt(int64(i), 10) {
+ t.Fatalf("urls of vid=%d is not valid.", i)
+ }
+ }
+
+ //When continue to add nodes to the linked list, the previous node will be deleted, and the cache of the response will be gone.
+ for i := length; i < length+5; i++ {
+ mc.addLocation(uint32(i), Location{Url: strconv.FormatInt(int64(i), 10)})
+ mc.resetVidMap()
+ }
+ for i := 0; i < length; i++ {
+ locations, found := mc.GetLocations(uint32(i))
+ if found {
+ t.Fatalf("urls of vid[%d] should not exists, but found: %v", i, locations)
+ }
+ }
+
+ //The delete operation will be applied to all cache nodes
+ _, found := mc.GetLocations(uint32(length))
+ if !found {
+ t.Fatalf("urls of vid[%d] not found", length)
+ }
+
+ //If the locations of the current node exist, return directly
+ newUrl := "abc"
+ mc.addLocation(uint32(length), Location{Url: newUrl})
+ locations, found := mc.GetLocations(uint32(length))
+ if !found || locations[0].Url != newUrl {
+ t.Fatalf("urls of vid[%d] not found", length)
+ }
+
+ //After delete `abc`, cache nodes are searched
+ deleteLoc := Location{Url: newUrl}
+ mc.deleteLocation(uint32(length), deleteLoc)
+ locations, found = mc.GetLocations(uint32(length))
+ if found && locations[0].Url != strconv.FormatInt(int64(length), 10) {
+ t.Fatalf("urls of vid[%d] not expected", length)
+ }
+
+ //lock: concurrent test
+ var wg sync.WaitGroup
+ for i := 0; i < 20; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < 100; i++ {
+ for i := 0; i < 20; i++ {
+ _, _ = mc.GetLocations(uint32(i))
+ }
+ }
+ }()
+ }
+
+ for i := 0; i < 100; i++ {
+ mc.addLocation(uint32(i), Location{})
+ }
+ wg.Wait()
+}
+
func BenchmarkLocationIndex(b *testing.B) {
b.SetParallelism(8)
vm := vidMap{