aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go8
-rw-r--r--weed/filesys/dirty_page_interval.go73
-rw-r--r--weed/filesys/file.go19
-rw-r--r--weed/filesys/filehandle.go9
-rw-r--r--weed/filesys/meta_cache/meta_cache.go6
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go13
-rw-r--r--weed/filesys/wfs.go19
-rw-r--r--weed/filesys/wfs_write.go5
8 files changed, 71 insertions, 81 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 6fda134aa..dd0c48796 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -9,10 +9,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
var (
concurrentWriterLimit = runtime.NumCPU()
+ concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit)
)
type ContinuousDirtyPages struct {
@@ -93,8 +95,6 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
- maxList.Destroy()
-
return true
}
@@ -110,10 +110,8 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
go func() {
defer pages.writeWaitGroup.Done()
- dir, _ := pages.f.fullpath().DirAndName()
-
reader = io.LimitReader(reader, size)
- chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.chunkSaveErrChan <- err
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
index f644bea0b..1404bf78c 100644
--- a/weed/filesys/dirty_page_interval.go
+++ b/weed/filesys/dirty_page_interval.go
@@ -5,7 +5,6 @@ import (
"io"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/valyala/bytebufferpool"
)
type IntervalNode struct {
@@ -13,15 +12,6 @@ type IntervalNode struct {
Offset int64
Size int64
Next *IntervalNode
- Buffer *bytebufferpool.ByteBuffer
-}
-
-func (l *IntervalNode) Bytes() []byte {
- data := l.Data
- if data == nil {
- data = l.Buffer.Bytes()
- }
- return data
}
type IntervalLinkedList struct {
@@ -33,39 +23,16 @@ type ContinuousIntervals struct {
lists []*IntervalLinkedList
}
-func NewIntervalLinkedList(head, tail *IntervalNode) *IntervalLinkedList {
- list := &IntervalLinkedList{
- Head: head,
- Tail: tail,
- }
- return list
-}
-
-func (list *IntervalLinkedList) Destroy() {
- for t := list.Head; t != nil; t = t.Next {
- if t.Buffer != nil {
- bytebufferpool.Put(t.Buffer)
- }
- }
-}
-
func (list *IntervalLinkedList) Offset() int64 {
return list.Head.Offset
}
func (list *IntervalLinkedList) Size() int64 {
return list.Tail.Offset + list.Tail.Size - list.Head.Offset
}
-
func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) {
// glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
- if list.Tail.Buffer == nil {
- list.Tail.Buffer = bytebufferpool.Get()
- list.Tail.Buffer.Write(list.Tail.Data)
- list.Tail.Data = nil
- }
- list.Tail.Buffer.Write(node.Data)
- list.Tail.Size += int64(len(node.Data))
- return
+ list.Tail.Next = node
+ list.Tail = node
}
func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) {
// glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
@@ -80,7 +47,7 @@ func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) {
nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
if nodeStart < nodeStop {
// glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
- copy(buf[nodeStart-start:], t.Bytes()[nodeStart-t.Offset:nodeStop-t.Offset])
+ copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset])
}
if t.Next == nil {
@@ -105,15 +72,8 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
// skip non overlapping IntervalNode
continue
}
- data := t.Bytes()[nodeStart-t.Offset : nodeStop-t.Offset]
- if t.Data == nil {
- // need to clone if the bytes is from byte buffer
- t := make([]byte, len(data))
- copy(t, data)
- data = t
- }
nodes = append(nodes, &IntervalNode{
- Data: data,
+ Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset],
Offset: nodeStart,
Size: nodeStop - nodeStart,
Next: nil,
@@ -122,7 +82,10 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
for i := 1; i < len(nodes); i++ {
nodes[i-1].Next = nodes[i]
}
- return NewIntervalLinkedList(nodes[0], nodes[len(nodes)-1])
+ return &IntervalLinkedList{
+ Head: nodes[0],
+ Tail: nodes[len(nodes)-1],
+ }
}
func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
@@ -132,7 +95,7 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
// append to the tail and return
if len(c.lists) == 1 {
lastSpan := c.lists[0]
- if lastSpan.Tail.Offset + lastSpan.Tail.Size == offset {
+ if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset {
lastSpan.addNodeToTail(interval)
return
}
@@ -190,7 +153,10 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
nextList.addNodeToHead(interval)
}
if prevList == nil && nextList == nil {
- c.lists = append(c.lists, NewIntervalLinkedList(interval, interval))
+ c.lists = append(c.lists, &IntervalLinkedList{
+ Head: interval,
+ Tail: interval,
+ })
}
return
@@ -198,12 +164,11 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList {
var maxSize int64
- maxIndex, maxOffset := -1, int64(-1)
+ maxIndex := -1
for k, list := range c.lists {
- listSize := list.Size()
- if maxSize < listSize || (maxSize == listSize && list.Offset() < maxOffset ) {
- maxSize = listSize
- maxIndex, maxOffset = k, list.Offset()
+ if maxSize <= list.Size() {
+ maxSize = list.Size()
+ maxIndex = k
}
}
if maxSize <= 0 {
@@ -246,10 +211,10 @@ func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxSto
func (l *IntervalLinkedList) ToReader() io.Reader {
var readers []io.Reader
t := l.Head
- readers = append(readers, util.NewBytesReader(t.Bytes()))
+ readers = append(readers, util.NewBytesReader(t.Data))
for t.Next != nil {
t = t.Next
- readers = append(readers, bytes.NewReader(t.Bytes()))
+ readers = append(readers, bytes.NewReader(t.Data))
}
if len(readers) == 1 {
return readers[0]
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 98ee010d8..7aa1016d7 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -253,15 +253,16 @@ func (file *File) Forget() {
}
func (file *File) maybeLoadEntry(ctx context.Context) error {
- if (file.entry == nil || len(file.entry.HardLinkId) != 0) && file.isOpen <= 0 {
- entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
- if err != nil {
- glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
- return err
- }
- if entry != nil {
- file.setEntry(entry)
- }
+ if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 {
+ return nil
+ }
+ entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
+ if err != nil {
+ glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ return err
+ }
+ if entry != nil {
+ file.setEntry(entry)
}
return nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index e3163117c..54bde3494 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -126,7 +126,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
totalRead, err := fh.f.reader.ReadAt(buff, offset)
- if err != nil && err != io.EOF{
+ if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
@@ -143,6 +143,11 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
// write the request to volume servers
data := req.Data
+ if len(data) <= 512 {
+ // fuse message cacheable size
+ data = make([]byte, len(req.Data))
+ copy(data, req.Data)
+ }
fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
@@ -251,7 +256,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
- chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks)
+ chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 0dd129623..4b282253d 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -21,13 +21,15 @@ type MetaCache struct {
sync.RWMutex
visitedBoundary *bounded_tree.BoundedTree
uidGidMapper *UidGidMapper
+ invalidateFunc func(util.FullPath)
}
-func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache {
+func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
+ invalidateFunc: invalidateFunc,
}
}
@@ -70,6 +72,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
+ glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name())
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}
@@ -82,6 +85,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
if newEntry != nil {
newDir, _ := newEntry.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
+ glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
return err
}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
index 9b72cadcf..f9973f436 100644
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -23,15 +23,15 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
}
+ dir := resp.Directory
var oldPath util.FullPath
var newEntry *filer.Entry
if message.OldEntry != nil {
- oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ oldPath = util.NewFullPath(dir, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
}
if message.NewEntry != nil {
- dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
@@ -39,7 +39,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
glog.V(4).Infof("creating %v", key)
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
- return mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ if err == nil && message.OldEntry != nil && message.NewEntry != nil {
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(key)
+ }
+
+ return err
+
}
for {
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 265fc95a8..759e21b15 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -92,7 +92,14 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper)
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
+ fsNode := wfs.fsNodeCache.GetFsNode(filePath)
+ if fsNode != nil {
+ if file, ok := fsNode.(*File); ok {
+ file.entry = nil
+ }
+ }
+ })
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {
@@ -119,10 +126,12 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
defer wfs.handlesLock.Unlock()
inodeId := file.fullpath().AsInode()
- existingHandle, found := wfs.handles[inodeId]
- if found && existingHandle != nil {
- file.isOpen++
- return existingHandle
+ if file.isOpen > 0 {
+ existingHandle, found := wfs.handles[inodeId]
+ if found && existingHandle != nil {
+ file.isOpen++
+ return existingHandle
+ }
}
fileHandle = newFileHandle(file, uid, gid)
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 27b2297ed..83e40e7f5 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -10,9 +10,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
-func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType {
+func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
@@ -26,7 +27,7 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType {
Collection: wfs.option.Collection,
TtlSec: wfs.option.TtlSec,
DataCenter: wfs.option.DataCenter,
- ParentPath: dir,
+ Path: string(fullPath),
}
resp, err := client.AssignVolume(context.Background(), request)