aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-05-03 00:13:57 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-05-03 00:13:57 +0500
commitd8925b4e832b9ed12deae0b9b94cd5f050f2f258 (patch)
treea197c8619048285e87c89932666ec0c7b2434f40 /weed/server
parent7640e650e5e647e6d6f3c8043c1e0a7442f154c7 (diff)
parent998c1973eae6cbf5f91a80d3d6a864db91a3d38f (diff)
downloadseaweedfs-d8925b4e832b9ed12deae0b9b94cd5f050f2f258.tar.xz
seaweedfs-d8925b4e832b9ed12deae0b9b94cd5f050f2f258.zip
Merge branch 'new_master' into ydb
# Conflicts: # go.mod # go.sum
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_server.go3
-rw-r--r--weed/server/master_grpc_server.go10
-rw-r--r--weed/server/master_grpc_server_cluster.go12
-rw-r--r--weed/server/master_server.go4
-rw-r--r--weed/server/volume_grpc_tier_upload.go2
5 files changed, 16 insertions, 15 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 928aad253..82b15084d 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -51,6 +51,7 @@ import (
type FilerOption struct {
Masters map[string]pb.ServerAddress
+ FilerGroup string
Collection string
DefaultReplication string
DisableDirListing bool
@@ -119,7 +120,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 83abdaaad..981f663e4 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -201,13 +201,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
// buffer by 1 so we don't end up getting stuck writing to stopChan forever
stopChan := make(chan bool, 1)
- clientName, messageChan := ms.addClient(req.ClientType, peerAddress)
- for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) {
+ clientName, messageChan := ms.addClient(req.FilerGroup, req.ClientType, peerAddress)
+ for _, update := range ms.Cluster.AddClusterNode(req.FilerGroup, req.ClientType, peerAddress, req.Version) {
ms.broadcastToClients(update)
}
defer func() {
- for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) {
+ for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, peerAddress) {
ms.broadcastToClients(update)
}
ms.deleteClient(clientName)
@@ -276,8 +276,8 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
return nil
}
-func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
- clientName = clientType + "@" + string(clientAddress)
+func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
+ clientName = filerGroup + "." + clientType + "@" + string(clientAddress)
glog.V(0).Infof("+ client %v", clientName)
// we buffer this because otherwise we end up in a potential deadlock where
diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go
index a5c82627a..220398c6a 100644
--- a/weed/server/master_grpc_server_cluster.go
+++ b/weed/server/master_grpc_server_cluster.go
@@ -10,26 +10,26 @@ import (
func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) {
resp := &master_pb.ListClusterNodesResponse{}
-
- clusterNodes := ms.Cluster.ListClusterNode(req.ClientType)
+ filerGroup := cluster.FilerGroup(req.FilerGroup)
+ clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType)
for _, node := range clusterNodes {
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
Address: string(node.Address),
Version: node.Version,
- IsLeader: ms.Cluster.IsOneLeader(node.Address),
+ IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
})
}
return resp, nil
}
-func (ms *MasterServer) GetOneFiler() pb.ServerAddress {
+func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroup) pb.ServerAddress {
- clusterNodes := ms.Cluster.ListClusterNode(cluster.FilerType)
+ clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType)
var filers []pb.ServerAddress
for _, node := range clusterNodes {
- if ms.Cluster.IsOneLeader(node.Address) {
+ if ms.Cluster.IsOneLeader(filerGroup, node.Address) {
filers = append(filers, node.Address)
}
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 9f29d4ba7..5b8c28698 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -113,7 +113,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.MasterType, option.Master, "", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers),
adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(),
}
@@ -285,7 +285,7 @@ func (ms *MasterServer) startAdminScripts() {
for {
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
if ms.Topo.IsLeader() {
- shellOptions.FilerAddress = ms.GetOneFiler()
+ shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup))
if shellOptions.FilerAddress == "" {
continue
}
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
index fed15b6ab..c690de959 100644
--- a/weed/server/volume_grpc_tier_upload.go
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi
// locate the disk file
diskFile, ok := v.DataBackend.(*backend.DiskFile)
if !ok {
- return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId)
}
// check valid storage backend type