diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_server.go | 3 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 10 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_cluster.go | 12 | ||||
| -rw-r--r-- | weed/server/master_server.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tier_upload.go | 2 |
5 files changed, 16 insertions, 15 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 928aad253..82b15084d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -51,6 +51,7 @@ import ( type FilerOption struct { Masters map[string]pb.ServerAddress + FilerGroup string Collection string DefaultReplication string DisableDirListing bool @@ -119,7 +120,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 83abdaaad..981f663e4 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -201,13 +201,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ // buffer by 1 so we don't end up getting stuck writing to stopChan forever stopChan := make(chan bool, 1) - clientName, messageChan := ms.addClient(req.ClientType, peerAddress) - for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) { + clientName, messageChan := ms.addClient(req.FilerGroup, req.ClientType, peerAddress) + for _, update := range ms.Cluster.AddClusterNode(req.FilerGroup, req.ClientType, peerAddress, req.Version) { ms.broadcastToClients(update) } defer func() { - for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) { + for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, peerAddress) { ms.broadcastToClients(update) } ms.deleteClient(clientName) @@ -276,8 +276,8 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe return nil } -func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) { - clientName = clientType + "@" + string(clientAddress) +func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) { + clientName = filerGroup + "." + clientType + "@" + string(clientAddress) glog.V(0).Infof("+ client %v", clientName) // we buffer this because otherwise we end up in a potential deadlock where diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go index a5c82627a..220398c6a 100644 --- a/weed/server/master_grpc_server_cluster.go +++ b/weed/server/master_grpc_server_cluster.go @@ -10,26 +10,26 @@ import ( func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) { resp := &master_pb.ListClusterNodesResponse{} - - clusterNodes := ms.Cluster.ListClusterNode(req.ClientType) + filerGroup := cluster.FilerGroup(req.FilerGroup) + clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType) for _, node := range clusterNodes { resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ Address: string(node.Address), Version: node.Version, - IsLeader: ms.Cluster.IsOneLeader(node.Address), + IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address), }) } return resp, nil } -func (ms *MasterServer) GetOneFiler() pb.ServerAddress { +func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroup) pb.ServerAddress { - clusterNodes := ms.Cluster.ListClusterNode(cluster.FilerType) + clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType) var filers []pb.ServerAddress for _, node := range clusterNodes { - if ms.Cluster.IsOneLeader(node.Address) { + if ms.Cluster.IsOneLeader(filerGroup, node.Address) { filers = append(filers, node.Address) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9f29d4ba7..5b8c28698 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -113,7 +113,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.MasterType, option.Master, "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers), adminLocks: NewAdminLocks(), Cluster: cluster.NewCluster(), } @@ -285,7 +285,7 @@ func (ms *MasterServer) startAdminScripts() { for { time.Sleep(time.Duration(sleepMinutes) * time.Minute) if ms.Topo.IsLeader() { - shellOptions.FilerAddress = ms.GetOneFiler() + shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup)) if shellOptions.FilerAddress == "" { continue } diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go index fed15b6ab..c690de959 100644 --- a/weed/server/volume_grpc_tier_upload.go +++ b/weed/server/volume_grpc_tier_upload.go @@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi // locate the disk file diskFile, ok := v.DataBackend.(*backend.DiskFile) if !ok { - return fmt.Errorf("volume %d is not on local disk", req.VolumeId) + return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId) } // check valid storage backend type |
