diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-18 21:43:36 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-18 21:43:36 -0700 |
| commit | e5506152c0a27d38fa334b2e338d82ee02669ab9 (patch) | |
| tree | 1f589cbbf7244cbe5dbfe84ca89f5996e4ca9ff3 /weed/operation | |
| parent | 33c92b819a334b5709e6f1cbe304e4b8855c1238 (diff) | |
| download | seaweedfs-e5506152c0a27d38fa334b2e338d82ee02669ab9.tar.xz seaweedfs-e5506152c0a27d38fa334b2e338d82ee02669ab9.zip | |
refactoring
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/chunked_file.go | 5 | ||||
| -rw-r--r-- | weed/operation/compress.go | 95 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 78 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 3 |
4 files changed, 83 insertions, 98 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 995f06b53..295204dd8 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -4,12 +4,13 @@ import ( "encoding/json" "errors" "fmt" - "google.golang.org/grpc" "io" "io/ioutil" "net/http" "sort" + "google.golang.org/grpc" + "sync" "github.com/chrislusf/seaweedfs/weed/glog" @@ -55,7 +56,7 @@ func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { if isGzipped { var err error - if buffer, err = UnGzipData(buffer); err != nil { + if buffer, err = util.UnGzipData(buffer); err != nil { return nil, err } } diff --git a/weed/operation/compress.go b/weed/operation/compress.go deleted file mode 100644 index 7190eeeb2..000000000 --- a/weed/operation/compress.go +++ /dev/null @@ -1,95 +0,0 @@ -package operation - -import ( - "bytes" - "compress/flate" - "compress/gzip" - "io/ioutil" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "golang.org/x/tools/godoc/util" -) - -/* -* Default more not to gzip since gzip can be done on client side. - */ -func IsGzippable(ext, mtype string, data []byte) bool { - - shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype) - if iAmSure { - return shouldBeZipped - } - - isMostlyText := util.IsText(data) - - return isMostlyText -} - -/* -* Default more not to gzip since gzip can be done on client side. - */ -func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) { - - // text - if strings.HasPrefix(mtype, "text/") { - return true, true - } - - // images - switch ext { - case ".svg", ".bmp": - return true, true - } - if strings.HasPrefix(mtype, "image/") { - return false, true - } - - // by file name extension - switch ext { - case ".zip", ".rar", ".gz", ".bz2", ".xz": - return false, true - case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": - return true, true - case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp": - return true, true - case ".png", ".jpg", ".jpeg": - return false, true - } - - // by mime type - if strings.HasPrefix(mtype, "application/") { - if strings.HasSuffix(mtype, "xml") { - return true, true - } - if strings.HasSuffix(mtype, "script") { - return true, true - } - } - - return false, false -} - -func GzipData(input []byte) ([]byte, error) { - buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) - if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) - return nil, err - } - if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) - return nil, err - } - return buf.Bytes(), nil -} -func UnGzipData(input []byte) ([]byte, error) { - buf := bytes.NewBuffer(input) - r, _ := gzip.NewReader(buf) - defer r.Close() - output, err := ioutil.ReadAll(r) - if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) - } - return output, err -} diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go new file mode 100644 index 000000000..0c4f96654 --- /dev/null +++ b/weed/operation/tail_volume.go @@ -0,0 +1,78 @@ +package operation + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { + // find volume location, replication, ttl info + lookup, err := Lookup(master, vid.String()) + if err != nil { + return fmt.Errorf("look up volume %d: %v", vid, err) + } + if len(lookup.Locations) == 0 { + return fmt.Errorf("unable to locate volume %d", vid) + } + + volumeServer := lookup.Locations[0].Url + + return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + stream, err := client.VolumeTail(context.Background(), &volume_server_pb.VolumeTailRequest{ + VolumeId: uint32(vid), + SinceNs: sinceNs, + DrainingSeconds: uint32(timeoutSeconds), + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + needleHeader := resp.NeedleHeader + needleBody := resp.NeedleBody + + if len(needleHeader) == 0 { + continue + } + + for !resp.IsLastChunk { + resp, recvErr = stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + needleBody = append(needleBody, resp.NeedleBody...) + } + + n := new(needle.Needle) + n.ParseNeedleHeader(needleHeader) + n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion) + + err = fn(n) + + if err != nil { + return err + } + + } + return nil + }) +} diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 4417a0e70..c387d0230 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -18,6 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) type UploadResult struct { @@ -59,7 +60,7 @@ func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped boo contentIsGzipped := isGzipped shouldGzipNow := false if !isGzipped { - if shouldBeZipped, iAmSure := IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { + if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { shouldGzipNow = true contentIsGzipped = true } |
