aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/volume_grpc_read_all.go34
-rw-r--r--weed/storage/volume_read_all.go42
2 files changed, 45 insertions, 31 deletions
diff --git a/weed/server/volume_grpc_read_all.go b/weed/server/volume_grpc_read_all.go
index 3ee0b7d86..7fe5bad03 100644
--- a/weed/server/volume_grpc_read_all.go
+++ b/weed/server/volume_grpc_read_all.go
@@ -5,7 +5,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) {
@@ -24,9 +23,9 @@ func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_se
return fmt.Errorf("not found volume id %d", vid)
}
- scanner := &VolumeFileScanner4ReadAll{
- stream: stream,
- v: v,
+ scanner := &storage.VolumeFileScanner4ReadAll{
+ Stream: stream,
+ V: v,
}
offset := int64(v.SuperBlock.BlockSize())
@@ -35,30 +34,3 @@ func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_se
return err
}
-
-type VolumeFileScanner4ReadAll struct {
- stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
- v *storage.Volume
-}
-
-func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error {
- return nil
-
-}
-func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool {
- return true
-}
-
-func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
-
- sendErr := scanner.stream.Send(&volume_server_pb.ReadAllNeedlesResponse{
- VolumeId: uint32(scanner.v.Id),
- NeedleId: uint64(n.Id),
- Cookie: uint32(n.Cookie),
- NeedleBlob: n.Data,
- })
- if sendErr != nil {
- return sendErr
- }
- return nil
-}
diff --git a/weed/storage/volume_read_all.go b/weed/storage/volume_read_all.go
new file mode 100644
index 000000000..453a4495c
--- /dev/null
+++ b/weed/storage/volume_read_all.go
@@ -0,0 +1,42 @@
+package storage
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+type VolumeFileScanner4ReadAll struct {
+ Stream volume_server_pb.VolumeServer_ReadAllNeedlesServer
+ V *Volume
+}
+
+func (scanner *VolumeFileScanner4ReadAll) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ return nil
+
+}
+func (scanner *VolumeFileScanner4ReadAll) ReadNeedleBody() bool {
+ return true
+}
+
+func (scanner *VolumeFileScanner4ReadAll) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+
+ nv, ok := scanner.V.nm.Get(n.Id)
+ if !ok {
+ return nil
+ }
+ if nv.Offset.ToActualOffset() != offset {
+ return nil
+ }
+
+ sendErr := scanner.Stream.Send(&volume_server_pb.ReadAllNeedlesResponse{
+ VolumeId: uint32(scanner.V.Id),
+ NeedleId: uint64(n.Id),
+ Cookie: uint32(n.Cookie),
+ NeedleBlob: n.Data,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ return nil
+}