diff options
| author | chrislu <chris.lu@gmail.com> | 2022-09-14 23:05:30 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-09-14 23:05:30 -0700 |
| commit | c8645fd2323d97164489e0429ed140f84002e61e (patch) | |
| tree | 61b0ef54455002964d1367508bd26df7eaace08f /weed/server | |
| parent | c30f6abb11d74e494abac011d1e98576aaa33bda (diff) | |
| download | seaweedfs-c8645fd2323d97164489e0429ed140f84002e61e.tar.xz seaweedfs-c8645fd2323d97164489e0429ed140f84002e61e.zip | |
master: implement grpc VolumeMarkWritable
fix https://github.com/seaweedfs/seaweedfs/issues/3657
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 24 | ||||
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 64 |
2 files changed, 69 insertions, 19 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index d11a98f93..1848097ec 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -282,3 +282,27 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV return resp, nil } + +func (ms *MasterServer) VolumeMarkReadonly(ctx context.Context, req *master_pb.VolumeMarkReadonlyRequest) (*master_pb.VolumeMarkReadonlyResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + resp := &master_pb.VolumeMarkReadonlyResponse{} + + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(req.ReplicaPlacement)) + vl := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, needle.LoadTTLFromUint32(req.Ttl), types.ToDiskType(req.DiskType)) + dataNodes := ms.Topo.Lookup(req.Collection, needle.VolumeId(req.VolumeId)) + for _, dn := range dataNodes { + if dn.Ip == req.Ip && dn.Port == int(req.Port) { + if req.IsReadonly { + vl.SetVolumeUnavailable(dn, needle.VolumeId(req.VolumeId)) + } else { + vl.SetVolumeAvailable(dn, needle.VolumeId(req.VolumeId), false) + } + } + } + + return resp, nil +} diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 8fa6e27f0..790e6e32a 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/storage" "path/filepath" "time" @@ -148,19 +149,19 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv resp := &volume_server_pb.VolumeMarkReadonlyResponse{} - if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{ - VolumeId: req.VolumeId, - }) - if err != nil { - return fmt.Errorf("set volume %d to read only on master: %v", req.VolumeId, err) - } - return nil - }); grpcErr != nil { - glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) - return resp, fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("volume %d not found", req.VolumeId) + } + + // step 1: stop master from redirecting traffic here + if err := vs.notifyMasterVolumeReadonly(v, true); err != nil { + return resp, err } + // rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2. + + // step 2: mark local volume as readonly err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId)) if err != nil { @@ -169,24 +170,44 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv glog.V(2).Infof("volume mark readonly %v", req) } + // step 3: tell master from redirecting traffic here again, to prevent rare case 1.5 + if err := vs.notifyMasterVolumeReadonly(v, true); err != nil { + return resp, err + } + return resp, err } -func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) { - - resp := &volume_server_pb.VolumeMarkWritableResponse{} - +func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error { if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - _, err := client.VolumeMarkWritable(context.Background(), &master_pb.VolumeMarkWritableRequest{ - VolumeId: req.VolumeId, + _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{ + Ip: vs.store.Ip, + Port: uint32(vs.store.Port), + VolumeId: uint32(v.Id), + Collection: v.Collection, + ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), + Ttl: v.Ttl.ToUint32(), + DiskType: string(v.DiskType()), + IsReadonly: isReadOnly, }) if err != nil { - return fmt.Errorf("set volume %d to writable on master: %v", req.VolumeId, err) + return fmt.Errorf("set volume %d to read only on master: %v", v.Id, err) } return nil }); grpcErr != nil { glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) - return resp, fmt.Errorf("grpc VolumeMarkWritable with master %s: %v", vs.GetMaster(), grpcErr) + return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr) + } + return nil +} + +func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) { + + resp := &volume_server_pb.VolumeMarkWritableResponse{} + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("volume %d not found", req.VolumeId) } err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId)) @@ -197,6 +218,11 @@ func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_serv glog.V(2).Infof("volume mark writable %v", req) } + // enable master to redirect traffic here + if err := vs.notifyMasterVolumeReadonly(v, false); err != nil { + return resp, err + } + return resp, err } |
