aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go11
-rw-r--r--weed/filesys/meta_cache/meta_cache.go9
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go35
-rw-r--r--weed/filesys/wfs.go2
-rw-r--r--weed/filesys/wfs_deletion.go2
-rw-r--r--weed/filesys/wfs_filer_client.go17
-rw-r--r--weed/filesys/wfs_write.go8
7 files changed, 51 insertions, 33 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 4dede3a8b..ae2ae3418 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -234,7 +234,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
- meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
+ visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ if visitErr != nil {
+ glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
+ return nil, fuse.EIO
+ }
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
@@ -296,7 +300,10 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
}
dirPath := util.FullPath(dir.FullPath())
- meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return nil, fuse.EIO
+ }
listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index bb81d6d27..0dd129623 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -2,6 +2,7 @@ package meta_cache
import (
"context"
+ "fmt"
"os"
"sync"
@@ -22,10 +23,10 @@ type MetaCache struct {
uidGidMapper *UidGidMapper
}
-func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper) *MetaCache {
+func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
- visitedBoundary: bounded_tree.NewBoundedTree(),
+ visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
}
}
@@ -116,6 +117,10 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
mc.RLock()
defer mc.RUnlock()
+ if !mc.visitedBoundary.HasVisited(dirPath) {
+ return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
+ }
+
entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 455a8772c..f42d61230 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -3,6 +3,8 @@ package meta_cache
import (
"context"
"fmt"
+ "strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,25 +12,34 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) {
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
- mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
+ return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
- err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
- entry := filer.FromPbEntry(string(dirPath), pbEntry)
- if err := mc.doInsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
- return err
+ for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 {
+ err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(dirPath), pbEntry)
+ if err := mc.doInsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ if entry.IsDirectory() {
+ childDirectories = append(childDirectories, entry.Name())
+ }
+ return nil
+ })
+ if err == nil {
+ break
}
- if entry.IsDirectory() {
- childDirectories = append(childDirectories, entry.Name())
+ if strings.Contains(err.Error(), "transport: ") {
+ glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime)
+ time.Sleep(waitTime)
+ continue
}
- return nil
- })
- if err != nil {
err = fmt.Errorf("list %s: %v", dirPath, err)
+ break
}
return
})
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 57b4c3da5..265fc95a8 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -92,7 +92,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper)
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index 9791c8630..a245b6795 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
}
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
- Url: wfs.AdjustedUrl(loc.Url),
+ Url: wfs.AdjustedUrl(loc),
PublicUrl: loc.PublicUrl,
})
}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index 736df3588..096ee555f 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -1,9 +1,6 @@
package filesys
import (
- "fmt"
- "strings"
-
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -26,15 +23,9 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
}
-func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
- if !wfs.option.OutsideContainerClusterMode {
- return hostAndPort
- }
- commaIndex := strings.Index(hostAndPort, ":")
- if commaIndex < 0 {
- return hostAndPort
+func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
+ if wfs.option.OutsideContainerClusterMode {
+ return location.PublicUrl
}
- filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
- return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
-
+ return location.Url
}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index fec33e4ab..e7db31203 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -38,8 +38,12 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType {
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = wfs.AdjustedUrl(host)
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := &filer_pb.Location{
+ Url: resp.Url,
+ PublicUrl: resp.PublicUrl,
+ }
+ host = wfs.AdjustedUrl(loc)
collection, replication = resp.Collection, resp.Replication
return nil