diff options
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 185 |
1 files changed, 92 insertions, 93 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index baee412b2..7e33c97a7 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -4,8 +4,8 @@ import ( "bytes" "context" "fmt" + "io" "sync" - "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -15,113 +15,68 @@ import ( ) type ContinuousDirtyPages struct { - hasData bool - Offset int64 - Size int64 - Data []byte - f *File - lock sync.Mutex + intervals *ContinuousIntervals + f *File + lock sync.Mutex + collection string + replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { return &ContinuousDirtyPages{ - Data: nil, - f: file, + intervals: &ContinuousIntervals{}, + f: file, } } func (pages *ContinuousDirtyPages) releaseResource() { - if pages.Data != nil { - pages.f.wfs.bufPool.Put(pages.Data) - pages.Data = nil - atomic.AddInt32(&counter, -1) - glog.V(3).Infof("%s/%s releasing resource %d", pages.f.dir.Path, pages.f.Name, counter) - } } var counter = int32(0) -func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { pages.lock.Lock() defer pages.lock.Unlock() - var chunk *filer_pb.FileChunk + glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. - return pages.flushAndSave(ctx, offset, data) - } - - if pages.Data == nil { - pages.Data = pages.f.wfs.bufPool.Get().([]byte) - atomic.AddInt32(&counter, 1) - glog.V(3).Infof("%s/%s acquire resource %d", pages.f.dir.Path, pages.f.Name, counter) + return pages.flushAndSave(offset, data) } - if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || - pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) { - // if the data is out of range, - // or buffer is full if adding new data, - // flush current buffer and add new data + pages.intervals.AddInterval(data, offset) - // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size) + var chunk *filer_pb.FileChunk + var hasSavedData bool - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return + if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit { + chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() + if hasSavedData { + chunks = append(chunks, chunk) } - pages.Offset = offset - copy(pages.Data, data) - pages.Size = int64(len(data)) - return } - if offset != pages.Offset+pages.Size { - // when this happens, debug shows the data overlapping with existing data is empty - // the data is not just append - if offset == pages.Offset && int(pages.Size) < len(data) { - // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size) - copy(pages.Data[pages.Size:], data[pages.Size:]) - } else { - if pages.Size != 0 { - glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data))) - } - return pages.flushAndSave(ctx, offset, data) - } - } else { - copy(pages.Data[offset-pages.Offset:], data) - } - - pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) - return } -func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { var chunk *filer_pb.FileChunk + var newChunks []*filer_pb.FileChunk // flush existing - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) - chunks = append(chunks, chunk) + if newChunks, err = pages.saveExistingPagesToStorage(); err == nil { + if newChunks != nil { + chunks = append(chunks, newChunks...) } } else { - glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } - pages.Size = 0 - pages.Offset = 0 // flush the new page - if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil { if chunk != nil { glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) chunks = append(chunks, chunk) @@ -134,40 +89,62 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6 return } -func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { pages.lock.Lock() defer pages.lock.Unlock() - if pages.Size == 0 { - return nil, nil - } + return pages.saveExistingPagesToStorage() +} - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - pages.Size = 0 - pages.Offset = 0 - if chunk != nil { - glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { + + var hasSavedData bool + var chunk *filer_pb.FileChunk + + for { + + chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() + if !hasSavedData { + return chunks, err + } + + if err == nil { + chunks = append(chunks, chunk) + } else { + return } } - return + } -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) { - if pages.Size == 0 { - return nil, nil + maxList := pages.intervals.RemoveLargestIntervalLinkedList() + if maxList == nil { + return nil, false, nil } - return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + if err == nil { + hasSavedData = true + glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + } else { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + return + } + + return } -func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { var fileId, host string var auth security.EncodedJwt - if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + dir, _ := pages.f.fullpath().DirAndName() + + if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -175,15 +152,21 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte Collection: pages.f.wfs.option.Collection, TtlSec: pages.f.wfs.option.TtlSec, DataCenter: pages.f.wfs.option.DataCenter, + ParentPath: dir, } - resp, err := client.AssignVolume(ctx, request) + resp, err := client.AssignVolume(context.Background(), request) if err != nil { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = pages.f.wfs.AdjustedUrl(host) + pages.collection, pages.replication = resp.Collection, resp.Replication return nil }); err != nil { @@ -191,8 +174,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth) + uploadResult, err := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) @@ -203,11 +185,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte } return &filer_pb.FileChunk{ - FileId: fileId, - Offset: offset, - Size: uint64(len(buf)), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, + FileId: fileId, + Offset: offset, + Size: uint64(size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + CipherKey: uploadResult.CipherKey, + IsGzipped: uploadResult.Gzip > 0, }, nil } @@ -218,3 +202,18 @@ func max(x, y int64) int64 { } return y } +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { + + pages.lock.Lock() + defer pages.lock.Unlock() + + return pages.intervals.ReadData(data, startOffset) + +} |
