aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <lebedev_k@tochka.com>2020-11-12 02:13:33 +0500
committerKonstantin Lebedev <lebedev_k@tochka.com>2020-11-12 02:13:33 +0500
commitfc7baef5bb58caa454deb6ea0376a8bd516aa080 (patch)
tree059310f4a2ed78c57ce6c84ea9d33811c5d1502a
parentb73ef6aa95399a30b54cd43260945cbe14e49457 (diff)
downloadseaweedfs-fc7baef5bb58caa454deb6ea0376a8bd516aa080.tar.xz
seaweedfs-fc7baef5bb58caa454deb6ea0376a8bd516aa080.zip
fiil serverUrls sorted by data center
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/filer/filer.go5
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go2
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/master_server.go2
-rw-r--r--weed/shell/commands.go2
-rw-r--r--weed/wdclient/masterclient.go6
-rw-r--r--weed/wdclient/vid_map.go14
-rw-r--r--weed/wdclient/vid_map_test.go2
9 files changed, 20 insertions, 17 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index e241a904e..080312aa8 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -125,7 +125,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ","))
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 35f4cdc6a..d9c250127 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -44,16 +44,15 @@ type Filer struct {
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
- filerHost string, filerGrpcPort uint32, collection string, replication string, notifyFn func()) *Filer {
+ filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
-
go f.loopProcessingDeletion()
return f
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index b07f81129..44625a7d4 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -11,7 +11,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", nil)
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 065bb3251..dc93ae062 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -89,7 +89,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index cc1c4b2ad..ccc94ebac 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -93,7 +93,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
adminLocks: NewAdminLocks(),
}
ms.bounedLeaderChan = make(chan int, 16)
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 1a937ad53..0e285214b 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -45,7 +45,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, "", strings.Split(*options.Masters, ",")),
option: options,
}
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index e91e6f28e..e39b9dfdf 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -24,14 +24,14 @@ type MasterClient struct {
vidMap
}
-func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, masters []string) *MasterClient {
+func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, clientDataCenter string, masters []string) *MasterClient {
return &MasterClient{
clientType: clientType,
clientHost: clientHost,
grpcPort: clientGrpcPort,
masters: masters,
grpcDialOption: grpcDialOption,
- vidMap: newVidMap(),
+ vidMap: newVidMap(clientDataCenter),
}
}
@@ -89,7 +89,7 @@ func (mc *MasterClient) tryAllMasters() {
}
mc.currentMaster = ""
- mc.vidMap = newVidMap()
+ mc.vidMap = newVidMap("")
}
}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index b72ac3f55..09b9eb71c 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -24,13 +24,14 @@ type Location struct {
type vidMap struct {
sync.RWMutex
vid2Locations map[uint32][]Location
-
- cursor int32
+ DataCenter string
+ cursor int32
}
-func newVidMap() vidMap {
+func newVidMap(dataCenter string) vidMap {
return vidMap{
vid2Locations: make(map[uint32][]Location),
+ DataCenter: dataCenter,
cursor: -1,
}
}
@@ -57,7 +58,11 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return nil, fmt.Errorf("volume %d not found", id)
}
for _, loc := range locations {
- serverUrls = append(serverUrls, loc.Url)
+ if vc.DataCenter == "" || loc.DataCenter == "" || vc.DataCenter != loc.DataCenter {
+ serverUrls = append(serverUrls, loc.Url)
+ } else {
+ serverUrls = append([]string{loc.Url}, serverUrls...)
+ }
}
return
}
@@ -93,7 +98,6 @@ func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error)
func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
vc.RLock()
defer vc.RUnlock()
-
locations, found = vc.vid2Locations[vid]
return
}
diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go
index 87be2fc25..0cea698ac 100644
--- a/weed/wdclient/vid_map_test.go
+++ b/weed/wdclient/vid_map_test.go
@@ -45,7 +45,7 @@ func TestLocationIndex(t *testing.T) {
mustOk(7, maxCursorIndex, 0)
// test with constructor
- vm = newVidMap()
+ vm = newVidMap("")
length := 7
for i := 0; i < 100; i++ {
got, err := vm.getLocationIndex(length)