diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-10-10 13:07:57 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-10 13:07:57 +0800 |
| commit | 411e49f96494629fa3da334e8dc7cc15bd4d19cd (patch) | |
| tree | 1756f0a2f86c871a6db67df7ecbd509c9df32233 /weed/filer | |
| parent | ac162fc85769cb1b2a1f8694f9644eae7d0ce6c8 (diff) | |
| parent | 4a15e9c830de1b654515308e5be8380ffa34aefa (diff) | |
| download | seaweedfs-411e49f96494629fa3da334e8dc7cc15bd4d19cd.tar.xz seaweedfs-411e49f96494629fa3da334e8dc7cc15bd4d19cd.zip | |
Merge pull request #23 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 36 | ||||
| -rw-r--r-- | weed/filer/filechunks.go | 12 | ||||
| -rw-r--r-- | weed/filer/filer_delete_entry.go | 2 | ||||
| -rw-r--r-- | weed/filer/reader_at.go | 138 | ||||
| -rw-r--r-- | weed/filer/stream.go | 48 |
5 files changed, 163 insertions, 73 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 37b172357..9e53e008f 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "time" "github.com/golang/protobuf/proto" @@ -84,21 +85,40 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil // TODO fetch from cache for weed mount? func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { - urlString, err := lookupFileIdFn(fileId) + urlStrings, err := lookupFileIdFn(fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return nil, err } + return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) +} + +func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { + + var err error var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { - buffer.Write(data) - }) - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", fileId, err) - return nil, err + + for waitTime := time.Second; waitTime < 10*time.Second; waitTime += waitTime / 2 { + for _, urlString := range urlStrings { + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", urlString, err) + buffer.Reset() + } else { + break + } + } + if err != nil { + glog.V(0).Infof("sleep for %v before retrying reading", waitTime) + time.Sleep(waitTime) + } else { + break + } } - return buffer.Bytes(), nil + return buffer.Bytes(), err } func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index db55eec00..c75a35f79 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -1,13 +1,15 @@ package filer import ( + "bytes" + "encoding/hex" "fmt" - "hash/fnv" "math" "sort" "sync" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) { @@ -42,12 +44,12 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { if len(chunks) == 1 { return chunks[0].ETag } - - h := fnv.New32a() + md5_digests := [][]byte{} for _, c := range chunks { - h.Write([]byte(c.ETag)) + md5_decoded, _ := hex.DecodeString(c.ETag) + md5_digests = append(md5_digests, md5_decoded) } - return fmt.Sprintf("%x", h.Sum32()) + return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks)) } func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 6c9ff56d3..69219fbfa 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -77,7 +77,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, nil,fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) } for _, sub := range entries { diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 9f338782e..fa51df687 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -3,52 +3,72 @@ package filer import ( "context" "fmt" - "io" - "sync" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/golang/groupcache/singleflight" + "io" + "math/rand" + "sync" ) type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews []*ChunkView - lookupFileId func(fileId string) (targetUrl string, err error) + lookupFileId LookupFileIdFunctionType readerLock sync.Mutex fileSize int64 - chunkCache chunk_cache.ChunkCache + fetchGroup singleflight.Group + lastChunkFileId string + lastChunkData []byte + chunkCache chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) -type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) +type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error) func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { - return func(fileId string) (targetUrl string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - vid := VolumeId(fileId) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - locations := resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } + vidCache := make(map[string]*filer_pb.Locations) + return func(fileId string) (targetUrls []string, err error) { + vid := VolumeId(fileId) + locations, found := vidCache[vid] + + if !found { + // println("looking up volume", vid) + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations = resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + vidCache[vid] = locations + + return nil + }) + } - volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + for _, loc := range locations.Locations { + volumeServerAddress := filerClient.AdjustedUrl(loc.Url) + targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + targetUrls = append(targetUrls, targetUrl) + } - targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + for i := len(targetUrls) - 1; i > 0; i-- { + j := rand.Intn(i + 1) + targetUrls[i], targetUrls[j] = targetUrls[j], targetUrls[i] + } - return nil - }) return } } @@ -76,10 +96,16 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { var buffer []byte startOffset, remaining := offset, int64(len(p)) + var nextChunk *ChunkView for i, chunk := range c.chunkViews { if remaining <= 0 { break } + if i+1 < len(c.chunkViews) { + nextChunk = c.chunkViews[i+1] + } else { + nextChunk = nil + } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) @@ -95,7 +121,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { continue } glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - buffer, err = c.readFromWholeChunkData(chunk) + buffer, err = c.readFromWholeChunkData(chunk, nextChunk) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) return @@ -123,27 +149,63 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { +func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) { - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + if c.lastChunkFileId == chunkView.FileId { + return c.lastChunkData, nil + } - chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - if chunkData != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) - } else { - glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId) - chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) - if err != nil { - return + v, doErr := c.readOneWholeChunk(chunkView) + + if doErr != nil { + return nil, doErr + } + + chunkData = v.([]byte) + + c.lastChunkData = chunkData + c.lastChunkFileId = chunkView.FileId + + for _, nextChunkView := range nextChunkViews { + if c.chunkCache != nil && nextChunkView != nil { + go c.readOneWholeChunk(nextChunkView) } - c.chunkCache.SetChunk(chunkView.FileId, chunkData) } return } -func (c *ChunkReadAt) doFetchFullChunkData(fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { +func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) { + + var err error + + return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) { + + glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + + data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + if data != nil { + glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) + } else { + var err error + data, err = c.doFetchFullChunkData(chunkView) + if err != nil { + return data, err + } + c.chunkCache.SetChunk(chunkView.FileId, data) + } + return data, err + }) +} + +func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { + + glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) + + data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) + + glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) - return fetchChunk(c.lookupFileId, fileId, cipherKey, isGzipped) + return data, err } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index dc6e414ca..f6e2a7643 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -17,28 +17,28 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f // fmt.Printf("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) - fileId2Url := make(map[string]string) + fileId2Url := make(map[string][]string) for _, chunkView := range chunkViews { - urlString, err := masterClient.LookupFileId(chunkView.FileId) + urlStrings, err := masterClient.LookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - fileId2Url[chunkView.FileId] = urlString + fileId2Url[chunkView.FileId] = urlStrings } for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + urlStrings := fileId2Url[chunkView.FileId] + + data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + if err == nil { return err } + w.Write(data) + } return nil @@ -51,25 +51,24 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) buffer := bytes.Buffer{} - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { return masterClient.LookupFileId(fileId) } chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) for _, chunkView := range chunkViews { - urlString, err := lookupFileIdFn(chunkView.FileId) + urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err } - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) + + data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) return nil, err } + buffer.Write(data) } return buffer.Bytes(), nil } @@ -89,7 +88,7 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { return masterClient.LookupFileId(fileId) } @@ -169,17 +168,24 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlString, err := c.lookupFileId(chunkView.FileId) + urlStrings, err := c.lookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) + for _, urlString := range urlStrings { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + buffer.Reset() + } else { + break + } + } if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) return err } c.buffer = buffer.Bytes() |
