aboutsummaryrefslogtreecommitdiff
path: root/unmaintained
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 11:05:02 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 11:05:02 -0700
commit3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6 (patch)
treee91672383dc2bc10a75079c4d9ca6ca98eef4994 /unmaintained
parentb142f9f1d57cebd1445baed7ce7fc88bb0f450f0 (diff)
downloadseaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.tar.xz
seaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.zip
add volume tailer
Diffstat (limited to 'unmaintained')
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go74
1 files changed, 74 insertions, 0 deletions
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
new file mode 100644
index 000000000..22c3ebab4
--- /dev/null
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -0,0 +1,74 @@
+package main
+
+import (
+ "context"
+ "flag"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/spf13/viper"
+
+ "io"
+ "log"
+)
+
+var (
+ master = flag.String("master", "localhost:9333", "master server host and port")
+ volumeId = flag.Int("volumeId", -1, "a volume id")
+ timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds")
+)
+
+func main() {
+ flag.Parse()
+
+ weed_server.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "client")
+
+ vid := storage.VolumeId(*volumeId)
+
+ // find volume location, replication, ttl info
+ lookup, err := operation.Lookup(*master, vid.String())
+ if err != nil {
+ log.Printf("Error looking up volume %d: %v", vid, err)
+ return
+ }
+ volumeServer := lookup.Locations[0].Url
+ log.Printf("volume %d is on volume server %s", vid, volumeServer)
+
+ err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
+ VolumeId: uint32(vid),
+ SinceNs: 0,
+ DrainingSeconds: uint32(*timeoutSeconds),
+ })
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ println("header:", len(resp.NeedleHeader), "body:", len(resp.NeedleBody))
+ }
+
+ return nil
+
+ })
+
+ if err != nil {
+ log.Printf("Error VolumeTail volume %d: %v", vid, err)
+ }
+
+
+}