aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/chunked_file.go5
-rw-r--r--weed/operation/compress.go95
-rw-r--r--weed/operation/tail_volume.go78
-rw-r--r--weed/operation/upload_content.go3
4 files changed, 83 insertions, 98 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 995f06b53..295204dd8 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -4,12 +4,13 @@ import (
"encoding/json"
"errors"
"fmt"
- "google.golang.org/grpc"
"io"
"io/ioutil"
"net/http"
"sort"
+ "google.golang.org/grpc"
+
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -55,7 +56,7 @@ func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
if isGzipped {
var err error
- if buffer, err = UnGzipData(buffer); err != nil {
+ if buffer, err = util.UnGzipData(buffer); err != nil {
return nil, err
}
}
diff --git a/weed/operation/compress.go b/weed/operation/compress.go
deleted file mode 100644
index 7190eeeb2..000000000
--- a/weed/operation/compress.go
+++ /dev/null
@@ -1,95 +0,0 @@
-package operation
-
-import (
- "bytes"
- "compress/flate"
- "compress/gzip"
- "io/ioutil"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "golang.org/x/tools/godoc/util"
-)
-
-/*
-* Default more not to gzip since gzip can be done on client side.
- */
-func IsGzippable(ext, mtype string, data []byte) bool {
-
- shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype)
- if iAmSure {
- return shouldBeZipped
- }
-
- isMostlyText := util.IsText(data)
-
- return isMostlyText
-}
-
-/*
-* Default more not to gzip since gzip can be done on client side.
- */
-func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) {
-
- // text
- if strings.HasPrefix(mtype, "text/") {
- return true, true
- }
-
- // images
- switch ext {
- case ".svg", ".bmp":
- return true, true
- }
- if strings.HasPrefix(mtype, "image/") {
- return false, true
- }
-
- // by file name extension
- switch ext {
- case ".zip", ".rar", ".gz", ".bz2", ".xz":
- return false, true
- case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json":
- return true, true
- case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp":
- return true, true
- case ".png", ".jpg", ".jpeg":
- return false, true
- }
-
- // by mime type
- if strings.HasPrefix(mtype, "application/") {
- if strings.HasSuffix(mtype, "xml") {
- return true, true
- }
- if strings.HasSuffix(mtype, "script") {
- return true, true
- }
- }
-
- return false, false
-}
-
-func GzipData(input []byte) ([]byte, error) {
- buf := new(bytes.Buffer)
- w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
- if _, err := w.Write(input); err != nil {
- glog.V(2).Infoln("error compressing data:", err)
- return nil, err
- }
- if err := w.Close(); err != nil {
- glog.V(2).Infoln("error closing compressed data:", err)
- return nil, err
- }
- return buf.Bytes(), nil
-}
-func UnGzipData(input []byte) ([]byte, error) {
- buf := bytes.NewBuffer(input)
- r, _ := gzip.NewReader(buf)
- defer r.Close()
- output, err := ioutil.ReadAll(r)
- if err != nil {
- glog.V(2).Infoln("error uncompressing data:", err)
- }
- return output, err
-}
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
new file mode 100644
index 000000000..0c4f96654
--- /dev/null
+++ b/weed/operation/tail_volume.go
@@ -0,0 +1,78 @@
+package operation
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
+ // find volume location, replication, ttl info
+ lookup, err := Lookup(master, vid.String())
+ if err != nil {
+ return fmt.Errorf("look up volume %d: %v", vid, err)
+ }
+ if len(lookup.Locations) == 0 {
+ return fmt.Errorf("unable to locate volume %d", vid)
+ }
+
+ volumeServer := lookup.Locations[0].Url
+
+ return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{
+ VolumeId: uint32(vid),
+ SinceNs: sinceNs,
+ DrainingSeconds: uint32(timeoutSeconds),
+ })
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ needleHeader := resp.NeedleHeader
+ needleBody := resp.NeedleBody
+
+ if len(needleHeader) == 0 {
+ continue
+ }
+
+ for !resp.IsLastChunk {
+ resp, recvErr = stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ needleBody = append(needleBody, resp.NeedleBody...)
+ }
+
+ n := new(needle.Needle)
+ n.ParseNeedleHeader(needleHeader)
+ n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
+
+ err = fn(n)
+
+ if err != nil {
+ return err
+ }
+
+ }
+ return nil
+ })
+}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 4417a0e70..c387d0230 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -18,6 +18,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type UploadResult struct {
@@ -59,7 +60,7 @@ func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped boo
contentIsGzipped := isGzipped
shouldGzipNow := false
if !isGzipped {
- if shouldBeZipped, iAmSure := IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
+ if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
shouldGzipNow = true
contentIsGzipped = true
}