aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 20:04:44 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 20:04:44 -0700
commit072644969e276d6071bed13cbf95320d25b8d468 (patch)
tree23e5c16485c15f960de6faa176bf27679ccf0b75
parentfa176fe80f77e569d03b2975a5c15cc213f37ea4 (diff)
downloadseaweedfs-072644969e276d6071bed13cbf95320d25b8d468.tar.xz
seaweedfs-072644969e276d6071bed13cbf95320d25b8d468.zip
refactoring
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go75
1 files changed, 44 insertions, 31 deletions
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
index 8a662ef5e..b90517009 100644
--- a/unmaintained/volume_tailer/volume_tailer.go
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
+ "fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -11,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/spf13/viper"
"golang.org/x/tools/godoc/util"
+ "google.golang.org/grpc"
"io"
"log"
@@ -31,16 +33,50 @@ func main() {
vid := storage.VolumeId(*volumeId)
+ err := TailVolume(*master, grpcDialOption, vid, func(n *storage.Needle) (err error) {
+ if n.Size == 0 {
+ println("-", n.String())
+ return nil
+ } else {
+ println("+", n.String())
+ }
+
+ if *showTextFile {
+
+ data := n.Data
+ if n.IsGzipped() {
+ if data, err = operation.UnGzipData(data); err != nil {
+ return err
+ }
+ }
+ if util.IsText(data) {
+ println(string(data))
+ }
+
+ println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data))
+ }
+ return nil
+ })
+
+ if err != nil {
+ log.Printf("Error VolumeTail volume %d: %v", vid, err)
+ }
+
+}
+
+func TailVolume(master string, grpcDialOption grpc.DialOption, vid storage.VolumeId, fn func(n *storage.Needle) error) error {
// find volume location, replication, ttl info
- lookup, err := operation.Lookup(*master, vid.String())
+ lookup, err := operation.Lookup(master, vid.String())
if err != nil {
- log.Printf("Error looking up volume %d: %v", vid, err)
- return
+ return fmt.Errorf("Error looking up volume %d: %v", vid, err)
}
+ if len(lookup.Locations) == 0 {
+ return fmt.Errorf("unable to locate volume %d", vid)
+ }
+
volumeServer := lookup.Locations[0].Url
- log.Printf("volume %d is on volume server %s", vid, volumeServer)
- err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
VolumeId: uint32(vid),
@@ -84,36 +120,13 @@ func main() {
n.ParseNeedleHeader(needleHeader)
n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion)
- if n.Size == 0 {
- println("-", n.String())
- continue
- } else {
- println("+", n.String())
- }
-
- if *showTextFile {
-
- data := n.Data
- if n.IsGzipped() {
- if data, err = operation.UnGzipData(data); err != nil {
- continue
- }
- }
- if util.IsText(data) {
- println(string(data))
- }
+ err = fn(n)
- println("-", n.String(), "compressed", n.IsGzipped(), "original size", len(data))
+ if err != nil {
+ return err
}
}
-
return nil
-
})
-
- if err != nil {
- log.Printf("Error VolumeTail volume %d: %v", vid, err)
- }
-
}