aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go40
-rw-r--r--weed/operation/chunked_file.go14
-rw-r--r--weed/operation/compress.go82
-rw-r--r--weed/operation/data_struts.go1
-rw-r--r--weed/operation/delete_content.go37
-rw-r--r--weed/operation/grpc_client.go30
-rw-r--r--weed/operation/list_masters.go32
-rw-r--r--weed/operation/lookup.go9
-rw-r--r--weed/operation/stats.go10
-rw-r--r--weed/operation/submit.go26
-rw-r--r--weed/operation/sync_volume.go54
-rw-r--r--weed/operation/tail_volume.go82
-rw-r--r--weed/operation/upload_content.go51
13 files changed, 219 insertions, 249 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 00e1caad5..4c50eaa26 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,9 +3,11 @@ package operation
import (
"context"
"fmt"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+ "strings"
)
type VolumeAssignRequest struct {
@@ -19,14 +21,15 @@ type VolumeAssignRequest struct {
}
type AssignResult struct {
- Fid string `json:"fid,omitempty"`
- Url string `json:"url,omitempty"`
- PublicUrl string `json:"publicUrl,omitempty"`
- Count uint64 `json:"count,omitempty"`
- Error string `json:"error,omitempty"`
+ Fid string `json:"fid,omitempty"`
+ Url string `json:"url,omitempty"`
+ PublicUrl string `json:"publicUrl,omitempty"`
+ Count uint64 `json:"count,omitempty"`
+ Error string `json:"error,omitempty"`
+ Auth security.EncodedJwt `json:"auth,omitempty"`
}
-func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -40,9 +43,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
continue
}
- lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: primaryRequest.Count,
@@ -53,7 +54,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
Rack: primaryRequest.Rack,
DataNode: primaryRequest.DataNode,
}
- resp, grpcErr := masterClient.Assign(ctx, req)
+ resp, grpcErr := masterClient.Assign(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
@@ -63,6 +64,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
ret.Url = resp.Url
ret.PublicUrl = resp.PublicUrl
ret.Error = resp.Error
+ ret.Auth = security.EncodedJwt(resp.Auth)
return nil
@@ -81,3 +83,17 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
return ret, lastError
}
+
+func LookupJwt(master string, fileId string) security.EncodedJwt {
+
+ tokenStr := ""
+
+ if h, e := util.Head(fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)); e == nil {
+ bearer := h.Get("Authorization")
+ if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
+ tokenStr = bearer[7:]
+ }
+ }
+
+ return security.EncodedJwt(tokenStr)
+}
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 9d8267dee..295204dd8 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -5,9 +5,12 @@ import (
"errors"
"fmt"
"io"
+ "io/ioutil"
"net/http"
"sort"
+ "google.golang.org/grpc"
+
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -53,7 +56,7 @@ func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
if isGzipped {
var err error
- if buffer, err = UnGzipData(buffer); err != nil {
+ if buffer, err = util.UnGzipData(buffer); err != nil {
return nil, err
}
}
@@ -69,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string) error {
+func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, fileIds)
+ results, err := DeleteFiles(master, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@@ -102,7 +105,10 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64,
if err != nil {
return written, err
}
- defer resp.Body.Close()
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
switch resp.StatusCode {
case http.StatusRequestedRangeNotSatisfiable:
diff --git a/weed/operation/compress.go b/weed/operation/compress.go
deleted file mode 100644
index 65979d529..000000000
--- a/weed/operation/compress.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package operation
-
-import (
- "bytes"
- "compress/flate"
- "compress/gzip"
- "io/ioutil"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "golang.org/x/tools/godoc/util"
-)
-
-/*
-* Default more not to gzip since gzip can be done on client side.
- */
-func IsGzippable(ext, mtype string, data []byte) bool {
-
- // text
- if strings.HasPrefix(mtype, "text/") {
- return true
- }
-
- // images
- switch ext {
- case ".svg", ".bmp":
- return true
- }
- if strings.HasPrefix(mtype, "image/") {
- return false
- }
-
- // by file name extention
- switch ext {
- case ".zip", ".rar", ".gz", ".bz2", ".xz":
- return false
- case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json":
- return true
- case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp":
- return true
- case ".png", ".jpg", ".jpeg":
- return false
- }
-
- // by mime type
- if strings.HasPrefix(mtype, "application/") {
- if strings.HasSuffix(mtype, "xml") {
- return true
- }
- if strings.HasSuffix(mtype, "script") {
- return true
- }
- }
-
- isMostlyText := util.IsText(data)
-
- return isMostlyText
-}
-
-func GzipData(input []byte) ([]byte, error) {
- buf := new(bytes.Buffer)
- w, _ := gzip.NewWriterLevel(buf, flate.BestCompression)
- if _, err := w.Write(input); err != nil {
- glog.V(2).Infoln("error compressing data:", err)
- return nil, err
- }
- if err := w.Close(); err != nil {
- glog.V(2).Infoln("error closing compressed data:", err)
- return nil, err
- }
- return buf.Bytes(), nil
-}
-func UnGzipData(input []byte) ([]byte, error) {
- buf := bytes.NewBuffer(input)
- r, _ := gzip.NewReader(buf)
- defer r.Close()
- output, err := ioutil.ReadAll(r)
- if err != nil {
- glog.V(2).Infoln("error uncompressing data:", err)
- }
- return output, err
-}
diff --git a/weed/operation/data_struts.go b/weed/operation/data_struts.go
index bfc53aa50..4980f9913 100644
--- a/weed/operation/data_struts.go
+++ b/weed/operation/data_struts.go
@@ -2,6 +2,5 @@ package operation
type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
- SecretKey string `json:"secretKey,omitempty"`
Error string `json:"error,omitempty"`
}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 3e468e1a3..6d84be76f 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,12 +4,12 @@ import (
"context"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "google.golang.org/grpc"
"net/http"
"strings"
"sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
type DeleteResult struct {
@@ -28,17 +28,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
- return LookupVolumeIds(master, vids)
+ return LookupVolumeIds(master, grpcDialOption, vids)
}
- return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
}
-func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
@@ -48,7 +48,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
vid, _, err := ParseFileId(fileId)
if err != nil {
ret = append(ret, &volume_server_pb.DeleteResult{
- FileId: vid,
+ FileId: fileId,
Status: http.StatusBadRequest,
Error: err.Error()},
)
@@ -85,38 +85,43 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
}
}
+ resultChan := make(chan []*volume_server_pb.DeleteResult, len(server_to_fileIds))
var wg sync.WaitGroup
-
for server, fidList := range server_to_fileIds {
wg.Add(1)
go func(server string, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil {
+ if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
err = deleteErr
} else {
- ret = append(ret, deleteResults...)
+ resultChan <- deleteResults
}
}(server, fidList)
}
wg.Wait()
+ close(resultChan)
+
+ for result := range resultChan {
+ ret = append(ret, result...)
+ }
+
+ glog.V(0).Infof("deleted %d items", len(ret))
return ret, err
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
}
- resp, err := volumeServerClient.BatchDelete(ctx, req)
+ resp, err := volumeServerClient.BatchDelete(context.Background(), req)
// fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp)
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index d0931a8d3..f6b2b69e9 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -1,34 +1,30 @@
package operation
import (
+ "context"
"fmt"
- "strconv"
- "strings"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
+ "strconv"
+ "strings"
)
-var (
- grpcClients = make(map[string]*grpc.ClientConn)
- grpcClientsLock sync.Mutex
-)
+func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
-func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
+ ctx := context.Background()
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
return err
}
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, grpcAddress)
+ }, grpcAddress, grpcDialOption)
}
@@ -42,16 +38,18 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
+func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
+
+ ctx := context.Background()
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0)
+ masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
if parseErr != nil {
- return fmt.Errorf("failed to parse master grpc %v", masterServer)
+ return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
}
- return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress)
+ }, masterGrpcAddress, grpcDialOption)
}
diff --git a/weed/operation/list_masters.go b/weed/operation/list_masters.go
deleted file mode 100644
index 75838de4d..000000000
--- a/weed/operation/list_masters.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package operation
-
-import (
- "encoding/json"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
-}
-
-func ListMasters(server string) (leader string, peers []string, err error) {
- jsonBlob, err := util.Get("http://" + server + "/cluster/status")
- glog.V(2).Info("list masters result :", string(jsonBlob))
- if err != nil {
- return "", nil, err
- }
- var ret ClusterStatusResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return "", nil, err
- }
- peers = ret.Peers
- if ret.IsLeader {
- peers = append(peers, ret.Leader)
- }
- return ret.Leader, peers, nil
-}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 562a11580..d0773e7fd 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"math/rand"
"net/url"
"strings"
@@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
+func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -98,14 +99,12 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
//only query unknown_vids
- err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,
}
- resp, grpcErr := masterClient.LookupVolume(ctx, req)
+ resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
index 364727272..b69a33750 100644
--- a/weed/operation/stats.go
+++ b/weed/operation/stats.go
@@ -2,18 +2,16 @@ package operation
import (
"context"
- "time"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
-func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
+func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
- err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- grpcResponse, grpcErr := masterClient.Statistics(ctx, req)
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 7a1a3085e..bdf59d966 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -36,10 +37,8 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart,
- replication string, collection string, dataCenter string, ttl string, maxMB int,
- secret security.Secret,
-) ([]SubmitResult, error) {
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
+ replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -51,7 +50,7 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
- ret, err := Assign(master, ar)
+ ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -67,7 +66,7 @@ func SubmitFiles(master string, files []FilePart,
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, secret)
+ results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -110,8 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
- jwt := security.GenJwt(secret, fi.Fid)
+func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -139,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -152,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return
}
id = ret.Fid
@@ -170,10 +168,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fileUrl,
- jwt)
+ ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -188,7 +186,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index e40c7de41..5562f12ab 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -2,63 +2,19 @@ package operation
import (
"context"
- "fmt"
- "io"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- . "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
)
-func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
+func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
+ WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{
- VolumdId: vid,
+ resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
+ VolumeId: vid,
})
return nil
})
return
}
-
-func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
-
- return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
- stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
- VolumdId: vid,
- })
- if err != nil {
- return err
- }
-
- var indexFileContent []byte
-
- for {
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return fmt.Errorf("read index entries: %v", err)
- }
- indexFileContent = append(indexFileContent, resp.IndexFileContent...)
- }
-
- dataSize := len(indexFileContent)
-
- for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize {
- line := indexFileContent[idx : idx+NeedleEntrySize]
- key := BytesToNeedleId(line[:NeedleIdSize])
- offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize])
- size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize])
- eachEntryFn(key, offset, size)
- }
-
- return nil
- })
-}
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
new file mode 100644
index 000000000..b53f18ce1
--- /dev/null
+++ b/weed/operation/tail_volume.go
@@ -0,0 +1,82 @@
+package operation
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
+ // find volume location, replication, ttl info
+ lookup, err := Lookup(master, vid.String())
+ if err != nil {
+ return fmt.Errorf("look up volume %d: %v", vid, err)
+ }
+ if len(lookup.Locations) == 0 {
+ return fmt.Errorf("unable to locate volume %d", vid)
+ }
+
+ volumeServer := lookup.Locations[0].Url
+
+ return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
+}
+
+func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
+ return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
+ VolumeId: uint32(vid),
+ SinceNs: sinceNs,
+ IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
+ })
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ needleHeader := resp.NeedleHeader
+ needleBody := resp.NeedleBody
+
+ if len(needleHeader) == 0 {
+ continue
+ }
+
+ for !resp.IsLastChunk {
+ resp, recvErr = stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ needleBody = append(needleBody, resp.NeedleBody...)
+ }
+
+ n := new(needle.Needle)
+ n.ParseNeedleHeader(needleHeader)
+ n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion)
+
+ err = fn(n)
+
+ if err != nil {
+ return err
+ }
+
+ }
+ return nil
+ })
+}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 030bf5889..c387d0230 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,6 +2,8 @@ package operation
import (
"bytes"
+ "compress/flate"
+ "compress/gzip"
"encoding/json"
"errors"
"fmt"
@@ -16,6 +18,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type UploadResult struct {
@@ -37,13 +40,43 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
-// Upload sends a POST request to a volume server to upload the content
+// Upload sends a POST request to a volume server to upload the content with adjustable compression level
+func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) {
+ if compressionLevel < 1 {
+ compressionLevel = 1
+ }
+ if compressionLevel > 9 {
+ compressionLevel = 9
+ }
+ return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt)
+}
+
+// Upload sends a POST request to a volume server to upload the content with fast compression
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
+ return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt)
+}
+
+func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) {
+ contentIsGzipped := isGzipped
+ shouldGzipNow := false
+ if !isGzipped {
+ if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
+ shouldGzipNow = true
+ contentIsGzipped = true
+ }
+ }
return upload_content(uploadUrl, func(w io.Writer) (err error) {
- _, err = io.Copy(w, reader)
+ if shouldGzipNow {
+ gzWriter, _ := gzip.NewWriterLevel(w, compression)
+ _, err = io.Copy(gzWriter, reader)
+ gzWriter.Close()
+ } else {
+ _, err = io.Copy(w, reader)
+ }
return
- }, filename, isGzipped, mtype, pairMap, jwt)
+ }, filename, contentIsGzipped, mtype, pairMap, jwt)
}
+
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
@@ -58,9 +91,6 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if isGzipped {
h.Set("Content-Encoding", "gzip")
}
- if jwt != "" {
- h.Set("Authorization", "BEARER "+string(jwt))
- }
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
@@ -86,18 +116,15 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
for k, v := range pairMap {
req.Header.Set(k, v)
}
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
resp, post_err := client.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err
}
defer resp.Body.Close()
-
- if resp.StatusCode < http.StatusOK ||
- resp.StatusCode > http.StatusIMUsed {
- return nil, errors.New(http.StatusText(resp.StatusCode))
- }
-
etag := getEtag(resp)
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {