diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 8 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval.go | 73 | ||||
| -rw-r--r-- | weed/filesys/file.go | 19 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 9 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 6 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 13 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 19 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 5 |
8 files changed, 71 insertions, 81 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 6fda134aa..dd0c48796 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -9,10 +9,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) var ( concurrentWriterLimit = runtime.NumCPU() + concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit) ) type ContinuousDirtyPages struct { @@ -93,8 +95,6 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - maxList.Destroy() - return true } @@ -110,10 +110,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, go func() { defer pages.writeWaitGroup.Done() - dir, _ := pages.f.fullpath().DirAndName() - reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) pages.chunkSaveErrChan <- err diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index f644bea0b..1404bf78c 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -5,7 +5,6 @@ import ( "io" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/valyala/bytebufferpool" ) type IntervalNode struct { @@ -13,15 +12,6 @@ type IntervalNode struct { Offset int64 Size int64 Next *IntervalNode - Buffer *bytebufferpool.ByteBuffer -} - -func (l *IntervalNode) Bytes() []byte { - data := l.Data - if data == nil { - data = l.Buffer.Bytes() - } - return data } type IntervalLinkedList struct { @@ -33,39 +23,16 @@ type ContinuousIntervals struct { lists []*IntervalLinkedList } -func NewIntervalLinkedList(head, tail *IntervalNode) *IntervalLinkedList { - list := &IntervalLinkedList{ - Head: head, - Tail: tail, - } - return list -} - -func (list *IntervalLinkedList) Destroy() { - for t := list.Head; t != nil; t = t.Next { - if t.Buffer != nil { - bytebufferpool.Put(t.Buffer) - } - } -} - func (list *IntervalLinkedList) Offset() int64 { return list.Head.Offset } func (list *IntervalLinkedList) Size() int64 { return list.Tail.Offset + list.Tail.Size - list.Head.Offset } - func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - if list.Tail.Buffer == nil { - list.Tail.Buffer = bytebufferpool.Get() - list.Tail.Buffer.Write(list.Tail.Data) - list.Tail.Data = nil - } - list.Tail.Buffer.Write(node.Data) - list.Tail.Size += int64(len(node.Data)) - return + list.Tail.Next = node + list.Tail = node } func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) @@ -80,7 +47,7 @@ func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) if nodeStart < nodeStop { // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) - copy(buf[nodeStart-start:], t.Bytes()[nodeStart-t.Offset:nodeStop-t.Offset]) + copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) } if t.Next == nil { @@ -105,15 +72,8 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { // skip non overlapping IntervalNode continue } - data := t.Bytes()[nodeStart-t.Offset : nodeStop-t.Offset] - if t.Data == nil { - // need to clone if the bytes is from byte buffer - t := make([]byte, len(data)) - copy(t, data) - data = t - } nodes = append(nodes, &IntervalNode{ - Data: data, + Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], Offset: nodeStart, Size: nodeStop - nodeStart, Next: nil, @@ -122,7 +82,10 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { for i := 1; i < len(nodes); i++ { nodes[i-1].Next = nodes[i] } - return NewIntervalLinkedList(nodes[0], nodes[len(nodes)-1]) + return &IntervalLinkedList{ + Head: nodes[0], + Tail: nodes[len(nodes)-1], + } } func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { @@ -132,7 +95,7 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { // append to the tail and return if len(c.lists) == 1 { lastSpan := c.lists[0] - if lastSpan.Tail.Offset + lastSpan.Tail.Size == offset { + if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset { lastSpan.addNodeToTail(interval) return } @@ -190,7 +153,10 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { nextList.addNodeToHead(interval) } if prevList == nil && nextList == nil { - c.lists = append(c.lists, NewIntervalLinkedList(interval, interval)) + c.lists = append(c.lists, &IntervalLinkedList{ + Head: interval, + Tail: interval, + }) } return @@ -198,12 +164,11 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { var maxSize int64 - maxIndex, maxOffset := -1, int64(-1) + maxIndex := -1 for k, list := range c.lists { - listSize := list.Size() - if maxSize < listSize || (maxSize == listSize && list.Offset() < maxOffset ) { - maxSize = listSize - maxIndex, maxOffset = k, list.Offset() + if maxSize <= list.Size() { + maxSize = list.Size() + maxIndex = k } } if maxSize <= 0 { @@ -246,10 +211,10 @@ func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxSto func (l *IntervalLinkedList) ToReader() io.Reader { var readers []io.Reader t := l.Head - readers = append(readers, util.NewBytesReader(t.Bytes())) + readers = append(readers, util.NewBytesReader(t.Data)) for t.Next != nil { t = t.Next - readers = append(readers, bytes.NewReader(t.Bytes())) + readers = append(readers, bytes.NewReader(t.Data)) } if len(readers) == 1 { return readers[0] diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 98ee010d8..7aa1016d7 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -253,15 +253,16 @@ func (file *File) Forget() { } func (file *File) maybeLoadEntry(ctx context.Context) error { - if (file.entry == nil || len(file.entry.HardLinkId) != 0) && file.isOpen <= 0 { - entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) - if err != nil { - glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return err - } - if entry != nil { - file.setEntry(entry) - } + if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 { + return nil + } + entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) + if err != nil { + glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) + return err + } + if entry != nil { + file.setEntry(entry) } return nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index e3163117c..54bde3494 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -126,7 +126,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { totalRead, err := fh.f.reader.ReadAt(buff, offset) - if err != nil && err != io.EOF{ + if err != nil && err != io.EOF { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } @@ -143,6 +143,11 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f // write the request to volume servers data := req.Data + if len(data) <= 512 { + // fuse message cacheable size + data = make([]byte, len(req.Data)) + copy(data, req.Data) + } fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) @@ -251,7 +256,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) - chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) + chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 0dd129623..4b282253d 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -21,13 +21,15 @@ type MetaCache struct { sync.RWMutex visitedBoundary *bounded_tree.BoundedTree uidGidMapper *UidGidMapper + invalidateFunc func(util.FullPath) } -func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache { +func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache { return &MetaCache{ localStore: openMetaStore(dbFolder), visitedBoundary: bounded_tree.NewBoundedTree(baseDir), uidGidMapper: uidGidMapper, + invalidateFunc: invalidateFunc, } } @@ -70,6 +72,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti // skip the unnecessary deletion // leave the update to the following InsertEntry operation } else { + glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name()) if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { return err } @@ -82,6 +85,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti if newEntry != nil { newDir, _ := newEntry.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { + glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { return err } diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index 9b72cadcf..f9973f436 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -23,15 +23,15 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } } + dir := resp.Directory var oldPath util.FullPath var newEntry *filer.Entry if message.OldEntry != nil { - oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + oldPath = util.NewFullPath(dir, message.OldEntry.Name) glog.V(4).Infof("deleting %v", oldPath) } if message.NewEntry != nil { - dir := resp.Directory if message.NewParentPath != "" { dir = message.NewParentPath } @@ -39,7 +39,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil glog.V(4).Infof("creating %v", key) newEntry = filer.FromPbEntry(dir, message.NewEntry) } - return mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + if err == nil && message.OldEntry != nil && message.NewEntry != nil { + key := util.NewFullPath(dir, message.NewEntry.Name) + mc.invalidateFunc(key) + } + + return err + } for { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 265fc95a8..759e21b15 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -92,7 +92,14 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) } - wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper) + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { + fsNode := wfs.fsNodeCache.GetFsNode(filePath) + if fsNode != nil { + if file, ok := fsNode.(*File); ok { + file.entry = nil + } + } + }) startTime := time.Now() go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() { @@ -119,10 +126,12 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand defer wfs.handlesLock.Unlock() inodeId := file.fullpath().AsInode() - existingHandle, found := wfs.handles[inodeId] - if found && existingHandle != nil { - file.isOpen++ - return existingHandle + if file.isOpen > 0 { + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + file.isOpen++ + return existingHandle + } } fileHandle = newFileHandle(file, uid, gid) diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 27b2297ed..83e40e7f5 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -10,9 +10,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) -func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType { +func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { var fileId, host string @@ -26,7 +27,7 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType { Collection: wfs.option.Collection, TtlSec: wfs.option.TtlSec, DataCenter: wfs.option.DataCenter, - ParentPath: dir, + Path: string(fullPath), } resp, err := client.AssignVolume(context.Background(), request) |
