diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 70 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 64 | ||||
| -rw-r--r-- | weed/operation/compress.go | 82 | ||||
| -rw-r--r-- | weed/operation/data_struts.go | 1 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 49 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 44 | ||||
| -rw-r--r-- | weed/operation/list_masters.go | 32 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 9 | ||||
| -rw-r--r-- | weed/operation/needle_parse_test.go | 130 | ||||
| -rw-r--r-- | weed/operation/stats.go | 28 | ||||
| -rw-r--r-- | weed/operation/submit.go | 44 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 54 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 83 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 172 |
14 files changed, 539 insertions, 323 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 00e1caad5..893bf516c 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,30 +3,36 @@ package operation import ( "context" "fmt" - "time" + "strings" + + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) type VolumeAssignRequest struct { - Count uint64 - Replication string - Collection string - Ttl string - DataCenter string - Rack string - DataNode string + Count uint64 + Replication string + Collection string + Ttl string + DataCenter string + Rack string + DataNode string + WritableVolumeCount uint32 } type AssignResult struct { - Fid string `json:"fid,omitempty"` - Url string `json:"url,omitempty"` - PublicUrl string `json:"publicUrl,omitempty"` - Count uint64 `json:"count,omitempty"` - Error string `json:"error,omitempty"` + Fid string `json:"fid,omitempty"` + Url string `json:"url,omitempty"` + PublicUrl string `json:"publicUrl,omitempty"` + Count uint64 `json:"count,omitempty"` + Error string `json:"error,omitempty"` + Auth security.EncodedJwt `json:"auth,omitempty"` } -func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -40,20 +46,19 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque continue } - lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ - Count: primaryRequest.Count, - Replication: primaryRequest.Replication, - Collection: primaryRequest.Collection, - Ttl: primaryRequest.Ttl, - DataCenter: primaryRequest.DataCenter, - Rack: primaryRequest.Rack, - DataNode: primaryRequest.DataNode, + Count: primaryRequest.Count, + Replication: primaryRequest.Replication, + Collection: primaryRequest.Collection, + Ttl: primaryRequest.Ttl, + DataCenter: primaryRequest.DataCenter, + Rack: primaryRequest.Rack, + DataNode: primaryRequest.DataNode, + WritableVolumeCount: primaryRequest.WritableVolumeCount, } - resp, grpcErr := masterClient.Assign(ctx, req) + resp, grpcErr := masterClient.Assign(context.Background(), req) if grpcErr != nil { return grpcErr } @@ -63,6 +68,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque ret.Url = resp.Url ret.PublicUrl = resp.PublicUrl ret.Error = resp.Error + ret.Auth = security.EncodedJwt(resp.Auth) return nil @@ -81,3 +87,17 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque return ret, lastError } + +func LookupJwt(master string, fileId string) security.EncodedJwt { + + tokenStr := "" + + if h, e := util.Head(fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)); e == nil { + bearer := h.Get("Authorization") + if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" { + tokenStr = bearer[7:] + } + } + + return security.EncodedJwt(tokenStr) +} diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 9d8267dee..653b7bf13 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -5,11 +5,13 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "sort" - "sync" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -38,22 +40,23 @@ type ChunkManifest struct { // seekable chunked file reader type ChunkedFileReader struct { - Manifest *ChunkManifest - Master string - pos int64 - pr *io.PipeReader - pw *io.PipeWriter - mutex sync.Mutex + totalSize int64 + chunkList []*ChunkInfo + master string + pos int64 + pr *io.PipeReader + pw *io.PipeWriter + mutex sync.Mutex } func (s ChunkList) Len() int { return len(s) } func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { - if isGzipped { +func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) { + if isCompressed { var err error - if buffer, err = UnGzipData(buffer); err != nil { + if buffer, err = util.DecompressData(buffer); err != nil { return nil, err } } @@ -69,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { return json.Marshal(cm) } -func (cm *ChunkManifest) DeleteChunks(master string) error { +func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error { var fileIds []string for _, ci := range cm.Chunks { fileIds = append(fileIds, ci.Fid) } - results, err := DeleteFiles(master, fileIds) + results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds) if err != nil { glog.V(0).Infof("delete %+v: %v", fileIds, err) return fmt.Errorf("chunk delete: %v", err) @@ -102,7 +105,10 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, if err != nil { return written, err } - defer resp.Body.Close() + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() switch resp.StatusCode { case http.StatusRequestedRangeNotSatisfiable: @@ -120,16 +126,29 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, return io.Copy(w, resp.Body) } +func NewChunkedFileReader(chunkList []*ChunkInfo, master string) *ChunkedFileReader { + var totalSize int64 + for _, chunk := range chunkList { + totalSize += chunk.Size + } + sort.Sort(ChunkList(chunkList)) + return &ChunkedFileReader{ + totalSize: totalSize, + chunkList: chunkList, + master: master, + } +} + func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: - case 1: + case io.SeekStart: + case io.SeekCurrent: offset += cf.pos - case 2: - offset = cf.Manifest.Size - offset + case io.SeekEnd: + offset = cf.totalSize + offset } - if offset > cf.Manifest.Size { + if offset > cf.totalSize { err = ErrInvalidRange } if cf.pos != offset { @@ -140,10 +159,9 @@ func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { } func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { - cm := cf.Manifest chunkIndex := -1 chunkStartOffset := int64(0) - for i, ci := range cm.Chunks { + for i, ci := range cf.chunkList { if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size { chunkIndex = i chunkStartOffset = cf.pos - ci.Offset @@ -153,10 +171,10 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { if chunkIndex < 0 { return n, ErrInvalidRange } - for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { - ci := cm.Chunks[chunkIndex] + for ; chunkIndex < len(cf.chunkList); chunkIndex++ { + ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) + fileUrl, lookupError := LookupFileId(cf.master, ci.Fid) if lookupError != nil { return n, lookupError } diff --git a/weed/operation/compress.go b/weed/operation/compress.go deleted file mode 100644 index 65979d529..000000000 --- a/weed/operation/compress.go +++ /dev/null @@ -1,82 +0,0 @@ -package operation - -import ( - "bytes" - "compress/flate" - "compress/gzip" - "io/ioutil" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "golang.org/x/tools/godoc/util" -) - -/* -* Default more not to gzip since gzip can be done on client side. - */ -func IsGzippable(ext, mtype string, data []byte) bool { - - // text - if strings.HasPrefix(mtype, "text/") { - return true - } - - // images - switch ext { - case ".svg", ".bmp": - return true - } - if strings.HasPrefix(mtype, "image/") { - return false - } - - // by file name extention - switch ext { - case ".zip", ".rar", ".gz", ".bz2", ".xz": - return false - case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": - return true - case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp": - return true - case ".png", ".jpg", ".jpeg": - return false - } - - // by mime type - if strings.HasPrefix(mtype, "application/") { - if strings.HasSuffix(mtype, "xml") { - return true - } - if strings.HasSuffix(mtype, "script") { - return true - } - } - - isMostlyText := util.IsText(data) - - return isMostlyText -} - -func GzipData(input []byte) ([]byte, error) { - buf := new(bytes.Buffer) - w, _ := gzip.NewWriterLevel(buf, flate.BestCompression) - if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) - return nil, err - } - if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) - return nil, err - } - return buf.Bytes(), nil -} -func UnGzipData(input []byte) ([]byte, error) { - buf := bytes.NewBuffer(input) - r, _ := gzip.NewReader(buf) - defer r.Close() - output, err := ioutil.ReadAll(r) - if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) - } - return output, err -} diff --git a/weed/operation/data_struts.go b/weed/operation/data_struts.go index bfc53aa50..4980f9913 100644 --- a/weed/operation/data_struts.go +++ b/weed/operation/data_struts.go @@ -2,6 +2,5 @@ package operation type JoinResult struct { VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"` - SecretKey string `json:"secretKey,omitempty"` Error string `json:"error,omitempty"` } diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 3e468e1a3..9868a411d 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -7,7 +7,8 @@ import ( "net/http" "strings" "sync" - "time" + + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) @@ -28,17 +29,25 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { } // DeleteFiles batch deletes a list of fileIds -func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { - - lookupFunc := func(vids []string) (map[string]LookupResult, error) { - return LookupVolumeIds(master, vids) +func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { + + lookupFunc := func(vids []string) (results map[string]LookupResult, err error) { + results, err = LookupVolumeIds(master, grpcDialOption, vids) + if err == nil && usePublicUrl { + for _, result := range results { + for _, loc := range result.Locations { + loc.Url = loc.PublicUrl + } + } + } + return } - return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) } -func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { var ret []*volume_server_pb.DeleteResult @@ -48,7 +57,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin vid, _, err := ParseFileId(fileId) if err != nil { ret = append(ret, &volume_server_pb.DeleteResult{ - FileId: vid, + FileId: fileId, Status: http.StatusBadRequest, Error: err.Error()}, ) @@ -85,38 +94,42 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin } } + resultChan := make(chan []*volume_server_pb.DeleteResult, len(server_to_fileIds)) var wg sync.WaitGroup - for server, fidList := range server_to_fileIds { wg.Add(1) go func(server string, fidList []string) { defer wg.Done() - if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil { + if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, true); deleteErr != nil { err = deleteErr - } else { - ret = append(ret, deleteResults...) + } else if deleteResults != nil { + resultChan <- deleteResults } }(server, fidList) } wg.Wait() + close(resultChan) + + for result := range resultChan { + ret = append(ret, result...) + } return ret, err } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { +func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ - FileIds: fileIds, + FileIds: fileIds, + SkipCookieCheck: !includeCookie, } - resp, err := volumeServerClient.BatchDelete(ctx, req) + resp, err := volumeServerClient.BatchDelete(context.Background(), req) // fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp) diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index d0931a8d3..025a65b38 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -4,31 +4,27 @@ import ( "fmt" "strconv" "strings" - "sync" + + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" ) -var ( - grpcClients = make(map[string]*grpc.ClientConn) - grpcClientsLock sync.Mutex -) - -func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) if err != nil { - return err + return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err) } - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, grpcAddress) + }, grpcAddress, grpcDialOption) } @@ -42,16 +38,30 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } -func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error { +func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer) if parseErr != nil { - return fmt.Errorf("failed to parse master grpc %v", masterServer) + return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterGrpcAddress) + }, masterGrpcAddress, grpcDialOption) + +} + +func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer) + if parseErr != nil { + return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) + } + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, grpcDialOption) } diff --git a/weed/operation/list_masters.go b/weed/operation/list_masters.go deleted file mode 100644 index 75838de4d..000000000 --- a/weed/operation/list_masters.go +++ /dev/null @@ -1,32 +0,0 @@ -package operation - -import ( - "encoding/json" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type ClusterStatusResult struct { - IsLeader bool `json:"IsLeader,omitempty"` - Leader string `json:"Leader,omitempty"` - Peers []string `json:"Peers,omitempty"` -} - -func ListMasters(server string) (leader string, peers []string, err error) { - jsonBlob, err := util.Get("http://" + server + "/cluster/status") - glog.V(2).Info("list masters result :", string(jsonBlob)) - if err != nil { - return "", nil, err - } - var ret ClusterStatusResult - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return "", nil, err - } - peers = ret.Peers - if ret.IsLeader { - peers = append(peers, ret.Leader) - } - return ret.Leader, peers, nil -} diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 562a11580..d0773e7fd 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "google.golang.org/grpc" "math/rand" "net/url" "strings" @@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { } // LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { +func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { ret := make(map[string]LookupResult) var unknown_vids []string @@ -98,14 +99,12 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err //only query unknown_vids - err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, } - resp, grpcErr := masterClient.LookupVolume(ctx, req) + resp, grpcErr := masterClient.LookupVolume(context.Background(), req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go new file mode 100644 index 000000000..20a610eaa --- /dev/null +++ b/weed/operation/needle_parse_test.go @@ -0,0 +1,130 @@ +package operation + +import ( + "bytes" + "fmt" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MockClient struct { + needleHandling func(n *needle.Needle, originalSize int, e error) +} + +func (m *MockClient) Do(req *http.Request) (*http.Response, error) { + n, originalSize, err := needle.CreateNeedleFromRequest(req, 1024*1024) + if m.needleHandling != nil { + m.needleHandling(n, originalSize, err) + } + return &http.Response{ + StatusCode: http.StatusNoContent, + }, io.EOF +} + +/* + +The mime type is always the value passed in. + +Compress or not depends on the content detection, file name extension, and compression ratio. + +If the content is already compressed, need to know the content size. + +*/ + +func TestCreateNeedleFromRequest(t *testing.T) { + mc := &MockClient{} + tmp := HttpClient + HttpClient = mc + defer func() { + HttpClient = tmp + }() + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, true, n.IsCompressed(), "this should be compressed") + assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip") + fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize) + } + uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "") + if len(data) != len(textContent) { + t.Errorf("data actual %d expected %d", len(data), len(textContent)) + } + if err != nil { + fmt.Printf("err: %v\n", err) + } + fmt.Printf("uploadResult: %+v\n", uploadResult) + } + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, true, n.IsCompressed(), "this should be compressed") + assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip") + fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) + } + gzippedData, _ := util.GzipData([]byte(textContent)) + Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "") + } + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, true, n.IsCompressed(), "this should be compressed") + assert.Equal(t, true, util.IsZstdContent(n.Data), "this should be zstd") + fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) + } + zstdData, _ := util.ZstdData([]byte(textContent)) + Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), true, "text/plain", nil, "") + } + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "application/zstd", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, false, n.IsCompressed(), "this should not be compressed") + assert.Equal(t, true, util.IsZstdContent(n.Data), "this should still be zstd") + fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) + } + zstdData, _ := util.ZstdData([]byte(textContent)) + Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), false, "application/zstd", nil, "") + } + +} + + +var textContent = `Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +`
\ No newline at end of file diff --git a/weed/operation/stats.go b/weed/operation/stats.go deleted file mode 100644 index 364727272..000000000 --- a/weed/operation/stats.go +++ /dev/null @@ -1,28 +0,0 @@ -package operation - -import ( - "context" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" -) - -func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { - - err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - grpcResponse, grpcErr := masterClient.Statistics(ctx, req) - if grpcErr != nil { - return grpcErr - } - - resp = grpcResponse - - return nil - - }) - - return -} diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 7a1a3085e..e8bec382a 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,7 +1,6 @@ package operation import ( - "bytes" "io" "mime" "net/url" @@ -10,6 +9,8 @@ import ( "strconv" "strings" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -26,6 +27,7 @@ type FilePart struct { Ttl string Server string //this comes from assign result Fid string //this comes from assign result, but customizable + Fsync bool } type SubmitResult struct { @@ -36,10 +38,7 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, files []FilePart, - replication string, collection string, dataCenter string, ttl string, maxMB int, - secret security.Secret, -) ([]SubmitResult, error) { +func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -51,9 +50,9 @@ func SubmitFiles(master string, files []FilePart, DataCenter: dataCenter, Ttl: ttl, } - ret, err := Assign(master, ar) + ret, err := Assign(master, grpcDialOption, ar) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err @@ -64,10 +63,13 @@ func SubmitFiles(master string, files []FilePart, file.Fid = file.Fid + "_" + strconv.Itoa(index) } file.Server = ret.Url + if usePublicUrl { + file.Server = ret.PublicUrl + } file.Replication = replication file.Collection = collection file.DataCenter = dataCenter - results[index].Size, err = file.Upload(maxMB, master, secret) + results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -110,12 +112,14 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) { - jwt := security.GenJwt(secret, fi.Fid) +func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) } + if fi.Fsync { + fileUrl += "?fsync=true" + } if closer, ok := fi.Reader.(io.Closer); ok { defer closer.Close() } @@ -139,7 +143,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret Collection: fi.Collection, Ttl: fi.Ttl, } - ret, err = Assign(master, ar) + ret, err = Assign(master, grpcDialOption, ar) if err != nil { return } @@ -152,10 +156,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret Collection: fi.Collection, Ttl: fi.Ttl, } - ret, err = Assign(master, ar) + ret, err = Assign(master, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, usePublicUrl, grpcDialOption) return } id = ret.Fid @@ -170,10 +174,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fileUrl, - jwt) + ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, usePublicUrl, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -188,10 +192,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, usePublicUrl, grpcDialOption) } } else { - ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt) + ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) if e != nil { return 0, e } @@ -204,8 +208,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, reader, false, - "application/octet-stream", nil, jwt) + uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) if uploadError != nil { return 0, uploadError } @@ -217,12 +220,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s if e != nil { return e } - bufReader := bytes.NewReader(buf) glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...") u, _ := url.Parse(fileUrl) q := u.Query() q.Set("cm", "true") u.RawQuery = q.Encode() - _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt) + _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt) return e } diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index e40c7de41..5562f12ab 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -2,63 +2,19 @@ package operation import ( "context" - "fmt" - "io" - "time" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" ) -func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { +func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{ - VolumdId: vid, + resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ + VolumeId: vid, }) return nil }) return } - -func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error { - - return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{ - VolumdId: vid, - }) - if err != nil { - return err - } - - var indexFileContent []byte - - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("read index entries: %v", err) - } - indexFileContent = append(indexFileContent, resp.IndexFileContent...) - } - - dataSize := len(indexFileContent) - - for idx := 0; idx+NeedleEntrySize <= dataSize; idx += NeedleEntrySize { - line := indexFileContent[idx : idx+NeedleEntrySize] - key := BytesToNeedleId(line[:NeedleIdSize]) - offset := BytesToOffset(line[NeedleIdSize : NeedleIdSize+OffsetSize]) - size := util.BytesToUint32(line[NeedleIdSize+OffsetSize : NeedleIdSize+OffsetSize+SizeSize]) - eachEntryFn(key, offset, size) - } - - return nil - }) -} diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go new file mode 100644 index 000000000..3cd66b5da --- /dev/null +++ b/weed/operation/tail_volume.go @@ -0,0 +1,83 @@ +package operation + +import ( + "context" + "fmt" + "io" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { + // find volume location, replication, ttl info + lookup, err := Lookup(master, vid.String()) + if err != nil { + return fmt.Errorf("look up volume %d: %v", vid, err) + } + if len(lookup.Locations) == 0 { + return fmt.Errorf("unable to locate volume %d", vid) + } + + volumeServer := lookup.Locations[0].Url + + return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn) +} + +func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { + return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{ + VolumeId: uint32(vid), + SinceNs: sinceNs, + IdleTimeoutSeconds: uint32(idleTimeoutSeconds), + }) + if err != nil { + return err + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + needleHeader := resp.NeedleHeader + needleBody := resp.NeedleBody + + if len(needleHeader) == 0 { + continue + } + + for !resp.IsLastChunk { + resp, recvErr = stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + needleBody = append(needleBody, resp.NeedleBody...) + } + + n := new(needle.Needle) + n.ParseNeedleHeader(needleHeader) + n.ReadNeedleBodyBytes(needleBody, needle.CurrentVersion) + + err = fn(n) + + if err != nil { + return err + } + + } + return nil + }) +} diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 030bf5889..658588ec3 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -13,38 +14,166 @@ import ( "net/textproto" "path/filepath" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + Md5 string `json:"md5,omitempty"` +} + +func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + CipherKey: uploadResult.CipherKey, + IsCompressed: uploadResult.Gzip > 0, + } +} + +// HTTPClient interface for testing +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) } var ( - client *http.Client + HttpClient HTTPClient ) func init() { - client = &http.Client{Transport: &http.Transport{ + HttpClient = &http.Client{Transport: &http.Transport{ MaxIdleConnsPerHost: 1024, }} } var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") -// Upload sends a POST request to a volume server to upload the content -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - return upload_content(uploadUrl, func(w io.Writer) (err error) { - _, err = io.Copy(w, reader) +// Upload sends a POST request to a volume server to upload the content with adjustable compression level +func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + if uploadResult != nil { + uploadResult.Md5 = util.Md5(data) + } + return +} + +// Upload sends a POST request to a volume server to upload the content with fast compression +func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + hash := md5.New() + reader = io.TeeReader(reader, hash) + uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) + if uploadResult != nil { + uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) + } + return +} + +func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + data, err = ioutil.ReadAll(reader) + if err != nil { + err = fmt.Errorf("read input: %v", err) + return + } + uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + return uploadResult, uploadErr, data +} + +func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + contentIsGzipped := isInputCompressed + shouldGzipNow := false + if !isInputCompressed { + if mtype == "" { + mtype = http.DetectContentType(data) + // println("detect1 mimetype to", mtype) + if mtype == "application/octet-stream" { + mtype = "" + } + } + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed { + shouldGzipNow = true + } else if !iAmSure && mtype == "" && len(data) > 128 { + var compressed []byte + compressed, err = util.GzipData(data[0:128]) + shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90% + } + } + + var clearDataLen int + + // gzip if possible + // this could be double copying + clearDataLen = len(data) + if shouldGzipNow { + compressed, compressErr := util.GzipData(data) + // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) + if compressErr == nil { + data = compressed + contentIsGzipped = true + } + } else if isInputCompressed { + // just to get the clear data length + clearData, err := util.DecompressData(data) + if err == nil { + clearDataLen = len(clearData) + } + } + + if cipher { + // encrypt(gzip(data)) + + // encrypt + cipherKey := util.GenCipherKey() + encryptedData, encryptionErr := util.Encrypt(data, cipherKey) + if encryptionErr != nil { + err = fmt.Errorf("encrypt input: %v", encryptionErr) + return + } + + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(encryptedData) + return + }, "", false, len(encryptedData), "", nil, jwt) + if uploadResult != nil { + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + } + } else { + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(data) + return + }, filename, contentIsGzipped, 0, mtype, pairMap, jwt) + } + + if uploadResult == nil { return - }, filename, isGzipped, mtype, pairMap, jwt) + } + + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } + + return uploadResult, err } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { + +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf) h := make(textproto.MIMEHeader) @@ -58,9 +187,6 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error if isGzipped { h.Set("Content-Encoding", "gzip") } - if jwt != "" { - h.Set("Authorization", "BEARER "+string(jwt)) - } file_writer, cp_err := body_writer.CreatePart(h) if cp_err != nil { @@ -86,24 +212,26 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error for k, v := range pairMap { req.Header.Set(k, v) } - resp, post_err := client.Do(req) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + resp, post_err := HttpClient.Do(req) if post_err != nil { glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) return nil, post_err } defer resp.Body.Close() - if resp.StatusCode < http.StatusOK || - resp.StatusCode > http.StatusIMUsed { - return nil, errors.New(http.StatusText(resp.StatusCode)) - } - + var ret UploadResult etag := getEtag(resp) + if resp.StatusCode == http.StatusNoContent { + ret.ETag = etag + return &ret, nil + } resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { return nil, ra_err } - var ret UploadResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) |
