diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-18 11:05:02 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-18 11:05:02 -0700 |
| commit | 3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6 (patch) | |
| tree | e91672383dc2bc10a75079c4d9ca6ca98eef4994 /unmaintained | |
| parent | b142f9f1d57cebd1445baed7ce7fc88bb0f450f0 (diff) | |
| download | seaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.tar.xz seaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.zip | |
add volume tailer
Diffstat (limited to 'unmaintained')
| -rw-r--r-- | unmaintained/volume_tailer/volume_tailer.go | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go new file mode 100644 index 000000000..22c3ebab4 --- /dev/null +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -0,0 +1,74 @@ +package main + +import ( + "context" + "flag" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/security" + weed_server "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/spf13/viper" + + "io" + "log" +) + +var ( + master = flag.String("master", "localhost:9333", "master server host and port") + volumeId = flag.Int("volumeId", -1, "a volume id") + timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds") +) + +func main() { + flag.Parse() + + weed_server.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client") + + vid := storage.VolumeId(*volumeId) + + // find volume location, replication, ttl info + lookup, err := operation.Lookup(*master, vid.String()) + if err != nil { + log.Printf("Error looking up volume %d: %v", vid, err) + return + } + volumeServer := lookup.Locations[0].Url + log.Printf("volume %d is on volume server %s", vid, volumeServer) + + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ + VolumeId: uint32(vid), + SinceNs: 0, + DrainingSeconds: uint32(*timeoutSeconds), + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + println("header:", len(resp.NeedleHeader), "body:", len(resp.NeedleBody)) + } + + return nil + + }) + + if err != nil { + log.Printf("Error VolumeTail volume %d: %v", vid, err) + } + + +} |
