diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-27 02:51:31 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-27 02:51:31 -0700 |
| commit | 225b019fe0b29c2eb073104f83bb9f14c3e345e3 (patch) | |
| tree | 10f13e6cd844ccc5b1a4e71da8bd2fdc4071ce56 /weed/server/volume_grpc_read_all.go | |
| parent | 1904448d4eb34b6b0ff3dc71676ec1f5f2d2cd40 (diff) | |
| download | seaweedfs-225b019fe0b29c2eb073104f83bb9f14c3e345e3.tar.xz seaweedfs-225b019fe0b29c2eb073104f83bb9f14c3e345e3.zip | |
stream read multiple volumes in a volume server
Diffstat (limited to 'weed/server/volume_grpc_read_all.go')
| -rw-r--r-- | weed/server/volume_grpc_read_all.go | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/weed/server/volume_grpc_read_all.go b/weed/server/volume_grpc_read_all.go index 21b42d8b1..3ee0b7d86 100644 --- a/weed/server/volume_grpc_read_all.go +++ b/weed/server/volume_grpc_read_all.go @@ -10,13 +10,23 @@ import ( func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) { - v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + for _, vid := range req.VolumeIds { + if err := vs.streaReadOneVolume(needle.VolumeId(vid), stream, err); err != nil { + return err + } + } + return nil +} + +func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer, err error) error { + v := vs.store.GetVolume(vid) if v == nil { - return fmt.Errorf("not found volume id %d", req.VolumeId) + return fmt.Errorf("not found volume id %d", vid) } scanner := &VolumeFileScanner4ReadAll{ stream: stream, + v: v, } offset := int64(v.SuperBlock.BlockSize()) @@ -24,11 +34,11 @@ func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesReque err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, offset, scanner) return err - } type VolumeFileScanner4ReadAll struct { stream volume_server_pb.VolumeServer_ReadAllNeedlesServer + v *storage.Volume } func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error { @@ -42,7 +52,9 @@ func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { sendErr := scanner.stream.Send(&volume_server_pb.ReadAllNeedlesResponse{ + VolumeId: uint32(scanner.v.Id), NeedleId: uint64(n.Id), + Cookie: uint32(n.Cookie), NeedleBlob: n.Data, }) if sendErr != nil { |
