diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
|---|---|---|
| committer | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
| commit | d861cbd81b75b6684c971ac00e33685e6575b833 (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/operation | |
| parent | 70da715d8d917527291b35fb069fac077d17b868 (diff) | |
| parent | 4ee58922eff61a5a4ca29c0b4829b097a498549e (diff) | |
| download | seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip | |
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 71 | ||||
| -rw-r--r-- | weed/operation/buffer_pool.go | 24 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 62 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 31 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 40 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 15 | ||||
| -rw-r--r-- | weed/operation/needle_parse_test.go | 131 | ||||
| -rw-r--r-- | weed/operation/stats.go | 26 | ||||
| -rw-r--r-- | weed/operation/submit.go | 57 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 11 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 223 |
11 files changed, 523 insertions, 168 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 2dfa44483..ffd3e4938 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,11 +3,14 @@ package operation import ( "context" "fmt" + "strings" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "strings" ) type VolumeAssignRequest struct { @@ -15,6 +18,7 @@ type VolumeAssignRequest struct { Replication string Collection string Ttl string + DiskType string DataCenter string Rack string DataNode string @@ -30,7 +34,7 @@ type AssignResult struct { Auth security.EncodedJwt `json:"auth,omitempty"` } -func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -44,17 +48,18 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } - lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ - Count: primaryRequest.Count, - Replication: primaryRequest.Replication, - Collection: primaryRequest.Collection, - Ttl: primaryRequest.Ttl, - DataCenter: primaryRequest.DataCenter, - Rack: primaryRequest.Rack, - DataNode: primaryRequest.DataNode, - WritableVolumeCount: primaryRequest.WritableVolumeCount, + Count: request.Count, + Replication: request.Replication, + Collection: request.Collection, + Ttl: request.Ttl, + DiskType: request.DiskType, + DataCenter: request.DataCenter, + Rack: request.Rack, + DataNode: request.DataNode, + WritableVolumeCount: request.WritableVolumeCount, } resp, grpcErr := masterClient.Assign(context.Background(), req) if grpcErr != nil { @@ -81,6 +86,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } + break } return ret, lastError @@ -99,3 +105,44 @@ func LookupJwt(master string, fileId string) security.EncodedJwt { return security.EncodedJwt(tokenStr) } + +type StorageOption struct { + Replication string + DiskType string + Collection string + DataCenter string + Rack string + TtlSeconds int32 + Fsync bool + VolumeGrowthCount uint32 +} + +func (so *StorageOption) TtlString() string { + return needle.SecondsToTTL(so.TtlSeconds) +} + +func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) { + ar = &VolumeAssignRequest{ + Count: uint64(count), + Replication: so.Replication, + Collection: so.Collection, + Ttl: so.TtlString(), + DiskType: so.DiskType, + DataCenter: so.DataCenter, + Rack: so.Rack, + WritableVolumeCount: so.VolumeGrowthCount, + } + if so.DataCenter != "" || so.Rack != "" { + altRequest = &VolumeAssignRequest{ + Count: uint64(count), + Replication: so.Replication, + Collection: so.Collection, + Ttl: so.TtlString(), + DiskType: so.DiskType, + DataCenter: "", + Rack: "", + WritableVolumeCount: so.VolumeGrowthCount, + } + } + return +} diff --git a/weed/operation/buffer_pool.go b/weed/operation/buffer_pool.go new file mode 100644 index 000000000..9cbe4787f --- /dev/null +++ b/weed/operation/buffer_pool.go @@ -0,0 +1,24 @@ +package operation + +import ( + "github.com/valyala/bytebufferpool" + "sync/atomic" +) + +var bufferCounter int64 + +func GetBuffer() *bytebufferpool.ByteBuffer { + defer func() { + atomic.AddInt64(&bufferCounter, 1) + // println("+", bufferCounter) + }() + return bytebufferpool.Get() +} + +func PutBuffer(buf *bytebufferpool.ByteBuffer) { + defer func() { + atomic.AddInt64(&bufferCounter, -1) + // println("-", bufferCounter) + }() + bytebufferpool.Put(buf) +} diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 295204dd8..8506e0518 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -8,11 +8,10 @@ import ( "io/ioutil" "net/http" "sort" + "sync" "google.golang.org/grpc" - "sync" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -41,23 +40,24 @@ type ChunkManifest struct { // seekable chunked file reader type ChunkedFileReader struct { - Manifest *ChunkManifest - Master string - pos int64 - pr *io.PipeReader - pw *io.PipeWriter - mutex sync.Mutex + totalSize int64 + chunkList []*ChunkInfo + master string + pos int64 + pr *io.PipeReader + pw *io.PipeWriter + mutex sync.Mutex } func (s ChunkList) Len() int { return len(s) } func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { - if isGzipped { +func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) { + if isCompressed { var err error - if buffer, err = util.UnGzipData(buffer); err != nil { - return nil, err + if buffer, err = util.DecompressData(buffer); err != nil { + glog.V(0).Infof("fail to decompress chunk manifest: %v", err) } } cm := ChunkManifest{} @@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { return json.Marshal(cm) } -func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error { +func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error { var fileIds []string for _, ci := range cm.Chunks { fileIds = append(fileIds, ci.Fid) } - results, err := DeleteFiles(master, grpcDialOption, fileIds) + results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds) if err != nil { glog.V(0).Infof("delete %+v: %v", fileIds, err) return fmt.Errorf("chunk delete: %v", err) @@ -126,16 +126,29 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, return io.Copy(w, resp.Body) } +func NewChunkedFileReader(chunkList []*ChunkInfo, master string) *ChunkedFileReader { + var totalSize int64 + for _, chunk := range chunkList { + totalSize += chunk.Size + } + sort.Sort(ChunkList(chunkList)) + return &ChunkedFileReader{ + totalSize: totalSize, + chunkList: chunkList, + master: master, + } +} + func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: - case 1: + case io.SeekStart: + case io.SeekCurrent: offset += cf.pos - case 2: - offset = cf.Manifest.Size - offset + case io.SeekEnd: + offset = cf.totalSize + offset } - if offset > cf.Manifest.Size { + if offset > cf.totalSize { err = ErrInvalidRange } if cf.pos != offset { @@ -146,10 +159,9 @@ func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { } func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { - cm := cf.Manifest chunkIndex := -1 chunkStartOffset := int64(0) - for i, ci := range cm.Chunks { + for i, ci := range cf.chunkList { if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size { chunkIndex = i chunkStartOffset = cf.pos - ci.Offset @@ -159,10 +171,12 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { if chunkIndex < 0 { return n, ErrInvalidRange } - for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { - ci := cm.Chunks[chunkIndex] + for ; chunkIndex < len(cf.chunkList); chunkIndex++ { + ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid) + fileUrl, lookupError := LookupFileId(func() string { + return cf.master + }, ci.Fid) if lookupError != nil { return n, lookupError } diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 358399324..8f87882b1 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "google.golang.org/grpc" "net/http" "strings" "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) type DeleteResult struct { @@ -28,10 +28,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { } // DeleteFiles batch deletes a list of fileIds -func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { - - lookupFunc := func(vids []string) (map[string]LookupResult, error) { - return LookupVolumeIds(master, grpcDialOption, vids) +func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { + + lookupFunc := func(vids []string) (results map[string]LookupResult, err error) { + results, err = LookupVolumeIds(masterFn, grpcDialOption, vids) + if err == nil && usePublicUrl { + for _, result := range results { + for _, loc := range result.Locations { + loc.Url = loc.PublicUrl + } + } + } + return } return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) @@ -92,9 +100,9 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str go func(server string, fidList []string) { defer wg.Done() - if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil { + if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, true); deleteErr != nil { err = deleteErr - } else { + } else if deleteResults != nil { resultChan <- deleteResults } @@ -107,18 +115,17 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str ret = append(ret, result...) } - glog.V(1).Infof("deleted %d items", len(ret)) - return ret, err } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { +func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ - FileIds: fileIds, + FileIds: fileIds, + SkipCookieCheck: !includeCookie, } resp, err := volumeServerClient.BatchDelete(context.Background(), req) diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index f6b2b69e9..025a65b38 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -1,27 +1,27 @@ package operation import ( - "context" "fmt" + "strconv" + "strings" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "strconv" - "strings" ) func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - ctx := context.Background() - grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) if err != nil { - return err + return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err) } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, grpcAddress, grpcDialOption) @@ -40,16 +40,28 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - ctx := context.Background() - - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer) if parseErr != nil { - return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr) + return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress, grpcDialOption) } + +func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer) + if parseErr != nil { + return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) + } + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filerGrpcAddress, grpcDialOption) + +} diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index d0773e7fd..0372e47b0 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -33,10 +33,10 @@ var ( vc VidCache // caching of volume locations, re-check if after 10 minutes ) -func Lookup(server string, vid string) (ret *LookupResult, err error) { +func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) { locations, cache_err := vc.Get(vid) if cache_err != nil { - if ret, err = do_lookup(server, vid); err == nil { + if ret, err = do_lookup(masterFn, vid); err == nil { vc.Set(vid, ret.Locations, 10*time.Minute) } } else { @@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) { return } -func do_lookup(server string, vid string) (*LookupResult, error) { +func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) { values := make(url.Values) values.Add("volumeId", vid) + server := masterFn() jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) if err != nil { return nil, err @@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) { return &ret, nil } -func LookupFileId(server string, fileId string) (fullUrl string, err error) { +func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) { parts := strings.Split(fileId, ",") if len(parts) != 2 { return "", errors.New("Invalid fileId " + fileId) } - lookup, lookupError := Lookup(server, parts[0]) + lookup, lookupError := Lookup(masterFn, parts[0]) if lookupError != nil { return "", lookupError } @@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { } // LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { +func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { ret := make(map[string]LookupResult) var unknown_vids []string @@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin //only query unknown_vids - err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go new file mode 100644 index 000000000..202374e1b --- /dev/null +++ b/weed/operation/needle_parse_test.go @@ -0,0 +1,131 @@ +package operation + +import ( + "bytes" + "fmt" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MockClient struct { + needleHandling func(n *needle.Needle, originalSize int, e error) +} + +func (m *MockClient) Do(req *http.Request) (*http.Response, error) { + n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024) + if m.needleHandling != nil { + m.needleHandling(n, originalSize, err) + } + return &http.Response{ + StatusCode: http.StatusNoContent, + }, io.EOF +} + +/* + +The mime type is always the value passed in. + +Compress or not depends on the content detection, file name extension, and compression ratio. + +If the content is already compressed, need to know the content size. + +*/ + +func TestCreateNeedleFromRequest(t *testing.T) { + mc := &MockClient{} + tmp := HttpClient + HttpClient = mc + defer func() { + HttpClient = tmp + }() + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, true, n.IsCompressed(), "this should be compressed") + assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip") + fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize) + } + uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "") + if len(data) != len(textContent) { + t.Errorf("data actual %d expected %d", len(data), len(textContent)) + } + if err != nil { + fmt.Printf("err: %v\n", err) + } + fmt.Printf("uploadResult: %+v\n", uploadResult) + } + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, true, n.IsCompressed(), "this should be compressed") + assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip") + fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) + } + gzippedData, _ := util.GzipData([]byte(textContent)) + Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "") + } + + /* + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, true, n.IsCompressed(), "this should be compressed") + assert.Equal(t, true, util.IsZstdContent(n.Data), "this should be zstd") + fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) + } + zstdData, _ := util.ZstdData([]byte(textContent)) + Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), true, "text/plain", nil, "") + } + + { + mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + assert.Equal(t, nil, err, "upload: %v", err) + assert.Equal(t, "application/zstd", string(n.Mime), "mime detection failed: %v", string(n.Mime)) + assert.Equal(t, false, n.IsCompressed(), "this should not be compressed") + assert.Equal(t, true, util.IsZstdContent(n.Data), "this should still be zstd") + fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize) + } + zstdData, _ := util.ZstdData([]byte(textContent)) + Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), false, "application/zstd", nil, "") + } + */ + +} + +var textContent = `Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +` diff --git a/weed/operation/stats.go b/weed/operation/stats.go deleted file mode 100644 index b69a33750..000000000 --- a/weed/operation/stats.go +++ /dev/null @@ -1,26 +0,0 @@ -package operation - -import ( - "context" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" -) - -func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { - - err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - - grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req) - if grpcErr != nil { - return grpcErr - } - - resp = grpcResponse - - return nil - - }) - - return -} diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 62f067430..87c5e4279 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,8 +1,6 @@ package operation import ( - "bytes" - "google.golang.org/grpc" "io" "mime" "net/url" @@ -11,6 +9,8 @@ import ( "strconv" "strings" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -25,20 +25,23 @@ type FilePart struct { Collection string DataCenter string Ttl string + DiskType string Server string //this comes from assign result Fid string //this comes from assign result, but customizable + Fsync bool } type SubmitResult struct { FileName string `json:"fileName,omitempty"` - FileUrl string `json:"fileUrl,omitempty"` + FileUrl string `json:"url,omitempty"` Fid string `json:"fid,omitempty"` Size uint32 `json:"size,omitempty"` Error string `json:"error,omitempty"` } -func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, - replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) { +type GetMasterFn func() string + +func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -49,10 +52,11 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart Collection: collection, DataCenter: dataCenter, Ttl: ttl, + DiskType: diskType, } - ret, err := Assign(master, grpcDialOption, ar) + ret, err := Assign(masterFn, grpcDialOption, ar) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err @@ -63,10 +67,15 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart file.Fid = file.Fid + "_" + strconv.Itoa(index) } file.Server = ret.Url + if usePublicUrl { + file.Server = ret.PublicUrl + } file.Replication = replication file.Collection = collection file.DataCenter = dataCenter - results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption) + file.Ttl = ttl + file.DiskType = diskType + results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -109,11 +118,14 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { +func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) } + if fi.Fsync { + fileUrl += "?fsync=true" + } if closer, ok := fi.Reader.(io.Closer); ok { defer closer.Close() } @@ -136,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp Replication: fi.Replication, Collection: fi.Collection, Ttl: fi.Ttl, + DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { return } @@ -149,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp Replication: fi.Replication, Collection: fi.Collection, Ttl: fi.Ttl, + DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return } id = ret.Fid @@ -164,14 +178,17 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp } } fileUrl := "http://" + ret.Url + "/" + id + if usePublicUrl { + fileUrl = "http://" + ret.PublicUrl + "/" + id + } count, e := upload_one_chunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), - master, fileUrl, + masterFn, fileUrl, ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -186,10 +203,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) } } else { - ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt) + ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) if e != nil { return 0, e } @@ -198,12 +215,11 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp return } -func upload_one_chunk(filename string, reader io.Reader, master, +func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, reader, false, - "", nil, jwt) + uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) if uploadError != nil { return 0, uploadError } @@ -215,12 +231,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s if e != nil { return e } - bufReader := bytes.NewReader(buf) glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...") u, _ := url.Parse(fileUrl) q := u.Query() q.Set("cm", "true") u.RawQuery = q.Encode() - _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt) + _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt) return e } diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index b53f18ce1..045948274 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -5,14 +5,15 @@ import ( "fmt" "io" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" ) -func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { +func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { // find volume location, replication, ttl info - lookup, err := Lookup(master, vid.String()) + lookup, err := Lookup(masterFn, vid.String()) if err != nil { return fmt.Errorf("look up volume %d: %v", vid, err) } @@ -27,8 +28,10 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{ + stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ VolumeId: uint32(vid), SinceNs: sinceNs, IdleTimeoutSeconds: uint32(idleTimeoutSeconds), diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index c387d0230..944186eeb 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,10 +2,7 @@ package operation import ( "bytes" - "compress/flate" - "compress/gzip" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -15,73 +12,188 @@ import ( "net/textproto" "path/filepath" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + ContentMd5 string `json:"contentMd5,omitempty"` + RetryCount int `json:"-"` +} + +func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { + fid, _ := filer_pb.ToFileIdObject(fileId) + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + CipherKey: uploadResult.CipherKey, + IsCompressed: uploadResult.Gzip > 0, + Fid: fid, + } +} + +// HTTPClient interface for testing +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) } var ( - client *http.Client + HttpClient HTTPClient ) func init() { - client = &http.Client{Transport: &http.Transport{ + HttpClient = &http.Client{Transport: &http.Transport{ + MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, }} } -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") +var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) { - if compressionLevel < 1 { - compressionLevel = 1 - } - if compressionLevel > 9 { - compressionLevel = 9 - } - return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt) +func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt) +func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) + return +} + +func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + bytesReader, ok := reader.(*util.BytesReader) + if ok { + data = bytesReader.Bytes + } else { + data, err = ioutil.ReadAll(reader) + if err != nil { + err = fmt.Errorf("read input: %v", err) + return + } + } + uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + return uploadResult, uploadErr, data } -func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) { - contentIsGzipped := isGzipped +func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + for i := 0; i < 3; i++ { + uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + if err == nil { + uploadResult.RetryCount = i + return + } else { + glog.Warningf("uploading to %s: %v", uploadUrl, err) + } + time.Sleep(time.Millisecond * time.Duration(237*(i+1))) + } + return +} + +func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + contentIsGzipped := isInputCompressed shouldGzipNow := false - if !isGzipped { - if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { + if !isInputCompressed { + if mtype == "" { + mtype = http.DetectContentType(data) + // println("detect1 mimetype to", mtype) + if mtype == "application/octet-stream" { + mtype = "" + } + } + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed { shouldGzipNow = true + } else if !iAmSure && mtype == "" && len(data) > 16*1024 { + var compressed []byte + compressed, err = util.GzipData(data[0:128]) + shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90% + } + } + + var clearDataLen int + + // gzip if possible + // this could be double copying + clearDataLen = len(data) + clearData := data + if shouldGzipNow && !cipher { + compressed, compressErr := util.GzipData(data) + // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) + if compressErr == nil { + data = compressed contentIsGzipped = true } + } else if isInputCompressed { + // just to get the clear data length + clearData, err = util.DecompressData(data) + if err == nil { + clearDataLen = len(clearData) + } } - return upload_content(uploadUrl, func(w io.Writer) (err error) { - if shouldGzipNow { - gzWriter, _ := gzip.NewWriterLevel(w, compression) - _, err = io.Copy(gzWriter, reader) - gzWriter.Close() - } else { - _, err = io.Copy(w, reader) + + if cipher { + // encrypt(gzip(data)) + + // encrypt + cipherKey := util.GenCipherKey() + encryptedData, encryptionErr := util.Encrypt(clearData, cipherKey) + if encryptionErr != nil { + err = fmt.Errorf("encrypt input: %v", encryptionErr) + return } - return - }, filename, contentIsGzipped, mtype, pairMap, jwt) + + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(encryptedData) + return + }, "", false, len(encryptedData), "", nil, jwt) + if uploadResult == nil { + return + } + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + uploadResult.Size = uint32(clearDataLen) + } else { + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(data) + return + }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) + if uploadResult == nil { + return + } + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } + } + + return uploadResult, err } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { + buf := GetBuffer() + defer PutBuffer(buf) + body_writer := multipart.NewWriter(buf) h := make(textproto.MIMEHeader) h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + h.Set("Idempotency-Key", uploadUrl) if mtype == "" { mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) } @@ -107,10 +219,10 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return nil, err } - req, postErr := http.NewRequest("POST", uploadUrl, body_buf) + req, postErr := http.NewRequest("POST", uploadUrl, bytes.NewReader(buf.Bytes())) if postErr != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error()) - return nil, postErr + glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr) + return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr) } req.Header.Set("Content-Type", content_type) for k, v := range pairMap { @@ -119,27 +231,42 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error if jwt != "" { req.Header.Set("Authorization", "BEARER "+string(jwt)) } - resp, post_err := client.Do(req) + // print("+") + resp, post_err := HttpClient.Do(req) if post_err != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) - return nil, post_err + if strings.Contains(post_err.Error(), "connection reset by peer") || + strings.Contains(post_err.Error(), "use of closed network connection") { + resp, post_err = HttpClient.Do(req) + } } - defer resp.Body.Close() + if post_err != nil { + return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err) + } + // print("-") + defer util.CloseResponse(resp) + + var ret UploadResult etag := getEtag(resp) + if resp.StatusCode == http.StatusNoContent { + ret.ETag = etag + return &ret, nil + } + resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { - return nil, ra_err + return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err) } - var ret UploadResult + unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) - return nil, unmarshal_err + glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body)) + return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err) } if ret.Error != "" { - return nil, errors.New(ret.Error) + return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error) } ret.ETag = etag + ret.ContentMd5 = resp.Header.Get("Content-MD5") return &ret, nil } |
