aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-14 23:05:30 -0700
committerchrislu <chris.lu@gmail.com>2022-09-14 23:05:30 -0700
commitc8645fd2323d97164489e0429ed140f84002e61e (patch)
tree61b0ef54455002964d1367508bd26df7eaace08f /weed/server
parentc30f6abb11d74e494abac011d1e98576aaa33bda (diff)
downloadseaweedfs-c8645fd2323d97164489e0429ed140f84002e61e.tar.xz
seaweedfs-c8645fd2323d97164489e0429ed140f84002e61e.zip
master: implement grpc VolumeMarkWritable
fix https://github.com/seaweedfs/seaweedfs/issues/3657
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server_volume.go24
-rw-r--r--weed/server/volume_grpc_admin.go64
2 files changed, 69 insertions, 19 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index d11a98f93..1848097ec 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -282,3 +282,27 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
return resp, nil
}
+
+func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.VolumeMarkReadonlyResponse{}
+
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement))
+ vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType))
+ dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId))
+ for _, dn := range dataNodes {
+ if dn.Ip == req.Ip && dn.Port == int(req.Port) {
+ if req.IsReadonly {
+ vl.SetVolumeUnavailable(dn, needle.VolumeId(req.VolumeId))
+ } else {
+ vl.SetVolumeAvailable(dn, needle.VolumeId(req.VolumeId), false)
+ }
+ }
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 8fa6e27f0..790e6e32a 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/storage"
"path/filepath"
"time"
@@ -148,19 +149,19 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
- if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
- VolumeId: req.VolumeId,
- })
- if err != nil {
- return fmt.Errorf("set volume %d to read only on master: %v", req.VolumeId, err)
- }
- return nil
- }); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
- return resp, fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // step 1: stop master from redirecting traffic here
+ if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
+ return resp, err
}
+ // rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
+
+ // step 2: mark local volume as readonly
err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
if err != nil {
@@ -169,24 +170,44 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
glog.V(2).Infof("volume mark readonly %v", req)
}
+ // step 3: tell master from redirecting traffic here again, to prevent rare case 1.5
+ if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
+ return resp, err
+ }
+
return resp, err
}
-func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
-
- resp := &volume_server_pb.VolumeMarkWritableResponse{}
-
+func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- _, err := client.VolumeMarkWritable(context.Background(), &master_pb.VolumeMarkWritableRequest{
- VolumeId: req.VolumeId,
+ _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
+ Ip: vs.store.Ip,
+ Port: uint32(vs.store.Port),
+ VolumeId: uint32(v.Id),
+ Collection: v.Collection,
+ ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
+ Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.DiskType()),
+ IsReadonly: isReadOnly,
})
if err != nil {
- return fmt.Errorf("set volume %d to writable on master: %v", req.VolumeId, err)
+ return fmt.Errorf("set volume %d to read only on master: %v", v.Id, err)
}
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
- return resp, fmt.Errorf("grpc VolumeMarkWritable with master %s: %v", vs.GetMaster(), grpcErr)
+ return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
+ }
+ return nil
+}
+
+func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
+
+ resp := &volume_server_pb.VolumeMarkWritableResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("volume %d not found", req.VolumeId)
}
err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
@@ -197,6 +218,11 @@ func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_serv
glog.V(2).Infof("volume mark writable %v", req)
}
+ // enable master to redirect traffic here
+ if err := vs.notifyMasterVolumeReadonly(v, false); err != nil {
+ return resp, err
+ }
+
return resp, err
}