aboutsummaryrefslogtreecommitdiff
path: root/unmaintained/stream_read_volume/stream_read_volume.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-27 02:51:31 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-27 02:51:31 -0700
commit225b019fe0b29c2eb073104f83bb9f14c3e345e3 (patch)
tree10f13e6cd844ccc5b1a4e71da8bd2fdc4071ce56 /unmaintained/stream_read_volume/stream_read_volume.go
parent1904448d4eb34b6b0ff3dc71676ec1f5f2d2cd40 (diff)
downloadseaweedfs-225b019fe0b29c2eb073104f83bb9f14c3e345e3.tar.xz
seaweedfs-225b019fe0b29c2eb073104f83bb9f14c3e345e3.zip
stream read multiple volumes in a volume server
Diffstat (limited to 'unmaintained/stream_read_volume/stream_read_volume.go')
-rw-r--r--unmaintained/stream_read_volume/stream_read_volume.go64
1 files changed, 64 insertions, 0 deletions
diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go
new file mode 100644
index 000000000..e120b9920
--- /dev/null
+++ b/unmaintained/stream_read_volume/stream_read_volume.go
@@ -0,0 +1,64 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "io"
+)
+
+var (
+ volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server")
+ volumeId = flag.Int("volumeId", -1, "a volume id to stream read")
+ grpcDialOption grpc.DialOption
+)
+
+func main() {
+ flag.Parse()
+
+ util.LoadConfiguration("security", false)
+ grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ vid := uint32(*volumeId)
+
+ eachNeedleFunc := func(resp *volume_server_pb.ReadAllNeedlesResponse) error {
+ fmt.Printf("%d,%x%08x %d\n", resp.VolumeId, resp.NeedleId, resp.Cookie, len(resp.NeedleBlob))
+ return nil
+ }
+
+ err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{
+ VolumeIds: []uint32{vid},
+ })
+ if err != nil {
+ return err
+ }
+ for {
+ resp, err := copyFileClient.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ if err = eachNeedleFunc(resp); err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("read %s: %v\n", *volumeServer, err)
+ }
+
+}
+