aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 21:17:43 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 21:17:43 -0700
commit33c92b819a334b5709e6f1cbe304e4b8855c1238 (patch)
treed35770a2e163088ba7cb789172b8ddb46d666a03
parenta2d34d4802978b294d536966aa98bec34058eea9 (diff)
downloadseaweedfs-33c92b819a334b5709e6f1cbe304e4b8855c1238.tar.xz
seaweedfs-33c92b819a334b5709e6f1cbe304e4b8855c1238.zip
refactoring
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go88
-rw-r--r--weed/storage/tail_volume.go.go78
2 files changed, 91 insertions, 75 deletions
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
index b90517009..b234f5c4d 100644
--- a/unmaintained/volume_tailer/volume_tailer.go
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -1,26 +1,22 @@
package main
import (
- "context"
"flag"
- "fmt"
+ "log"
+ "time"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
weed_server "github.com/chrislusf/seaweedfs/weed/server"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/spf13/viper"
"golang.org/x/tools/godoc/util"
- "google.golang.org/grpc"
-
- "io"
- "log"
)
var (
master = flag.String("master", "localhost:9333", "master server host and port")
volumeId = flag.Int("volumeId", -1, "a volume id")
+ rewindDuration = flag.Duration("rewind", -1, "rewind back in time. -1 means from the first entry. 0 means from now.")
timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds")
showTextFile = flag.Bool("showTextFile", false, "display textual file content")
)
@@ -33,7 +29,16 @@ func main() {
vid := storage.VolumeId(*volumeId)
- err := TailVolume(*master, grpcDialOption, vid, func(n *storage.Needle) (err error) {
+ var sinceTimeNs int64
+ if *rewindDuration == 0 {
+ sinceTimeNs = time.Now().UnixNano()
+ } else if *rewindDuration == -1 {
+ sinceTimeNs = 0
+ } else if *rewindDuration > 0 {
+ sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
+ }
+
+ err := storage.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *storage.Needle) (err error) {
if n.Size == 0 {
println("-", n.String())
return nil
@@ -63,70 +68,3 @@ func main() {
}
}
-
-func TailVolume(master string, grpcDialOption grpc.DialOption, vid storage.VolumeId, fn func(n *storage.Needle) error) error {
- // find volume location, replication, ttl info
- lookup, err := operation.Lookup(master, vid.String())
- if err != nil {
- return fmt.Errorf("Error looking up volume %d: %v", vid, err)
- }
- if len(lookup.Locations) == 0 {
- return fmt.Errorf("unable to locate volume %d", vid)
- }
-
- volumeServer := lookup.Locations[0].Url
-
- return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
-
- stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
- VolumeId: uint32(vid),
- SinceNs: 0,
- DrainingSeconds: uint32(*timeoutSeconds),
- })
- if err != nil {
- return err
- }
-
- for {
- resp, recvErr := stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- break
- } else {
- return recvErr
- }
- }
-
- needleHeader := resp.NeedleHeader
- needleBody := resp.NeedleBody
-
- if len(needleHeader) == 0 {
- continue
- }
-
- for !resp.IsLastChunk {
- resp, recvErr = stream.Recv()
- if recvErr != nil {
- if recvErr == io.EOF {
- break
- } else {
- return recvErr
- }
- }
- needleBody = append(needleBody, resp.NeedleBody...)
- }
-
- n := new(storage.Needle)
- n.ParseNeedleHeader(needleHeader)
- n.ReadNeedleBodyBytes(needleBody, storage.CurrentVersion)
-
- err = fn(n)
-
- if err != nil {
- return err
- }
-
- }
- return nil
- })
-}
diff --git a/weed/storage/tail_volume.go.go b/weed/storage/tail_volume.go.go
new file mode 100644
index 000000000..31ad058b1
--- /dev/null
+++ b/weed/storage/tail_volume.go.go
@@ -0,0 +1,78 @@
+package storage
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "google.golang.org/grpc"
+)
+
+func TailVolume(master string, grpcDialOption grpc.DialOption, vid VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *Needle) error) error {
+ // find volume location, replication, ttl info
+ lookup, err := operation.Lookup(master, vid.String())
+ if err != nil {
+ return fmt.Errorf("look up volume %d: %v", vid, err)
+ }
+ if len(lookup.Locations) == 0 {
+ return fmt.Errorf("unable to locate volume %d", vid)
+ }
+
+ volumeServer := lookup.Locations[0].Url
+
+ return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
+ VolumeId: uint32(vid),
+ SinceNs: sinceNs,
+ DrainingSeconds: uint32(timeoutSeconds),
+ })
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ needleHeader := resp.NeedleHeader
+ needleBody := resp.NeedleBody
+
+ if len(needleHeader) == 0 {
+ continue
+ }
+
+ for !resp.IsLastChunk {
+ resp, recvErr = stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ needleBody = append(needleBody, resp.NeedleBody...)
+ }
+
+ n := new(Needle)
+ n.ParseNeedleHeader(needleHeader)
+ n.ReadNeedleBodyBytes(needleBody, CurrentVersion)
+
+ err = fn(n)
+
+ if err != nil {
+ return err
+ }
+
+ }
+ return nil
+ })
+}