diff options
Diffstat (limited to 'weed/filesys/dirty_page.go')
| -rw-r--r-- | weed/filesys/dirty_page.go | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go new file mode 100644 index 000000000..996eb0abb --- /dev/null +++ b/weed/filesys/dirty_page.go @@ -0,0 +1,165 @@ +package filesys + +import ( + "fmt" + "bytes" + "time" + "context" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type ContinuousDirtyPages struct { + hasData bool + Offset int64 + Size int64 + Data []byte + f *File +} + +func newDirtyPages(file *File) *ContinuousDirtyPages { + return &ContinuousDirtyPages{ + Data: make([]byte, file.wfs.chunkSizeLimit), + f: file, + } +} + +func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { + + var chunk *filer_pb.FileChunk + + if len(data) > len(pages.Data) { + // this is more than what buffer can hold. + + // flush existing + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + chunks = append(chunks, chunk) + } else { + glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Size = 0 + + // flush the big page + if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + + return + } + + if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || + pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) { + // if the data is out of range, + // or buffer is full if adding new data, + // flush current buffer and add new data + + // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size) + + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if chunk != nil { + glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + chunks = append(chunks, chunk) + } + } else { + glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + return + } + pages.Offset = offset + pages.Size = int64(len(data)) + copy(pages.Data, data) + return + } + + copy(pages.Data[offset-pages.Offset:], data) + pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) + + return +} + +func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { + + if pages.Size == 0 { + return nil, nil + } + + if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { + pages.Size = 0 + if chunk != nil { + glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + } + return +} + +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { + return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) +} + +func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { + + if pages.Size == 0 { + return nil, nil + } + + var fileId, host string + + if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: pages.f.wfs.replication, + Collection: pages.f.wfs.collection, + } + + resp, err := client.AssignVolume(ctx, request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + + fileId, host = resp.FileId, resp.Url + + return nil + }); err != nil { + return nil, fmt.Errorf("filer assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + bufReader := bytes.NewReader(pages.Data[:pages.Size]) + uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "") + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) + return nil, fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) + return nil, fmt.Errorf("upload result: %v", uploadResult.Error) + } + + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(len(buf)), + Mtime: time.Now().UnixNano(), + }, nil + +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} |
