diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-27 02:51:31 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-27 02:51:31 -0700 |
| commit | 225b019fe0b29c2eb073104f83bb9f14c3e345e3 (patch) | |
| tree | 10f13e6cd844ccc5b1a4e71da8bd2fdc4071ce56 /unmaintained/stream_read_volume/stream_read_volume.go | |
| parent | 1904448d4eb34b6b0ff3dc71676ec1f5f2d2cd40 (diff) | |
| download | seaweedfs-225b019fe0b29c2eb073104f83bb9f14c3e345e3.tar.xz seaweedfs-225b019fe0b29c2eb073104f83bb9f14c3e345e3.zip | |
stream read multiple volumes in a volume server
Diffstat (limited to 'unmaintained/stream_read_volume/stream_read_volume.go')
| -rw-r--r-- | unmaintained/stream_read_volume/stream_read_volume.go | 64 |
1 files changed, 64 insertions, 0 deletions
diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go new file mode 100644 index 000000000..e120b9920 --- /dev/null +++ b/unmaintained/stream_read_volume/stream_read_volume.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "io" +) + +var ( + volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server") + volumeId = flag.Int("volumeId", -1, "a volume id to stream read") + grpcDialOption grpc.DialOption +) + +func main() { + flag.Parse() + + util.LoadConfiguration("security", false) + grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + vid := uint32(*volumeId) + + eachNeedleFunc := func(resp *volume_server_pb.ReadAllNeedlesResponse) error { + fmt.Printf("%d,%x%08x %d\n", resp.VolumeId, resp.NeedleId, resp.Cookie, len(resp.NeedleBlob)) + return nil + } + + err := operation.WithVolumeServerClient(pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{ + VolumeIds: []uint32{vid}, + }) + if err != nil { + return err + } + for { + resp, err := copyFileClient.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return err + } + if err = eachNeedleFunc(resp); err != nil { + return err + } + } + return nil + }) + if err != nil { + fmt.Printf("read %s: %v\n", *volumeServer, err) + } + +} + |
