aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go45
-rw-r--r--weed/filer/filechunk_manifest.go11
-rw-r--r--weed/filer/filer.go8
-rw-r--r--weed/filer/filer_buckets.go2
-rw-r--r--weed/filer/filer_conf.go1
-rw-r--r--weed/filer/filer_delete_entry.go8
-rw-r--r--weed/filer/filer_notify.go4
-rw-r--r--weed/filer/filer_on_meta_event.go25
-rw-r--r--weed/filer/filer_search.go34
-rw-r--r--weed/filer/filerstore.go5
-rw-r--r--weed/filer/filerstore_wrapper.go25
-rw-r--r--weed/filer/leveldb/leveldb_store.go2
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go6
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go1
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go6
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go1
-rw-r--r--weed/filer/leveldb3/leveldb3_store_test.go6
-rw-r--r--weed/filer/meta_aggregator.go1
-rw-r--r--weed/filer/mysql/mysql_sql_gen.go42
-rw-r--r--weed/filer/mysql/mysql_store.go11
-rw-r--r--weed/filer/mysql2/mysql2_store.go8
-rw-r--r--weed/filer/postgres/postgres_sql_gen.go41
-rw-r--r--weed/filer/postgres/postgres_store.go8
-rw-r--r--weed/filer/postgres2/postgres2_store.go8
-rw-r--r--weed/filer/read_write.go12
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go2
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go6
-rw-r--r--weed/filer/stream.go34
28 files changed, 262 insertions, 101 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 07ce56145..ab8f6bcbd 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -13,15 +13,15 @@ import (
)
type SqlGenerator interface {
- GetSqlInsert(bucket string) string
- GetSqlUpdate(bucket string) string
- GetSqlFind(bucket string) string
- GetSqlDelete(bucket string) string
- GetSqlDeleteFolderChildren(bucket string) string
- GetSqlListExclusive(bucket string) string
- GetSqlListInclusive(bucket string) string
- GetSqlCreateTable(bucket string) string
- GetSqlDropTable(bucket string) string
+ GetSqlInsert(tableName string) string
+ GetSqlUpdate(tableName string) string
+ GetSqlFind(tableName string) string
+ GetSqlDelete(tableName string) string
+ GetSqlDeleteFolderChildren(tableName string) string
+ GetSqlListExclusive(tableName string) string
+ GetSqlListInclusive(tableName string) string
+ GetSqlCreateTable(tableName string) string
+ GetSqlDropTable(tableName string) string
}
type AbstractSqlStore struct {
@@ -32,6 +32,29 @@ type AbstractSqlStore struct {
dbsLock sync.Mutex
}
+func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ store.CreateTable(context.Background(), bucket)
+
+ if store.dbs == nil {
+ return
+ }
+ store.dbs[bucket] = true
+}
+func (store *AbstractSqlStore) OnBucketDeletion(bucket string) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ store.deleteTable(context.Background(), bucket)
+
+ if store.dbs == nil {
+ return
+ }
+ delete(store.dbs, bucket)
+}
+
const (
DEFAULT_TABLE = "filemeta"
)
@@ -253,7 +276,9 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
}
}
- res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath)
+ glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
+
+ res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 99a62c90c..c709dc819 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -97,20 +97,20 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string,
func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
var err error
- var buffer bytes.Buffer
var shouldRetry bool
+ receivedData := make([]byte, 0, size)
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
- buffer.Write(data)
+ receivedData = receivedData[:0]
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ receivedData = append(receivedData, data...)
})
if !shouldRetry {
break
}
if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
- buffer.Reset()
} else {
break
}
@@ -123,7 +123,8 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
}
}
- return buffer.Bytes(), err
+ return receivedData, err
+
}
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index e59887763..effdc0e4e 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -241,12 +241,12 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
if oldEntry.IsDirectory() && !entry.IsDirectory() {
- glog.Errorf("existing %s is a directory", entry.FullPath)
- return fmt.Errorf("existing %s is a directory", entry.FullPath)
+ glog.Errorf("existing %s is a directory", oldEntry.FullPath)
+ return fmt.Errorf("existing %s is a directory", oldEntry.FullPath)
}
if !oldEntry.IsDirectory() && entry.IsDirectory() {
- glog.Errorf("existing %s is a file", entry.FullPath)
- return fmt.Errorf("existing %s is a file", entry.FullPath)
+ glog.Errorf("existing %s is a file", oldEntry.FullPath)
+ return fmt.Errorf("existing %s is a file", oldEntry.FullPath)
}
}
return f.Store.UpdateEntry(ctx, entry)
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
index ba170f02e..43fb000c9 100644
--- a/weed/filer/filer_buckets.go
+++ b/weed/filer/filer_buckets.go
@@ -29,7 +29,7 @@ func (f *Filer) LoadBuckets() {
limit := int64(math.MaxInt32)
- entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "")
+ entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "", "")
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 8e549f5ad..ab5afc5cc 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -18,6 +18,7 @@ const (
FilerConfName = "filer.conf"
IamConfigDirecotry = "/etc/iam"
IamIdentityFile = "identity.json"
+ IamPoliciesFile = "policies.json"
)
type FilerConf struct {
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 50a669f40..3ef3cfff9 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -11,6 +11,10 @@ import (
type HardLinkId []byte
+const (
+ MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
+)
+
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
if p == "/" {
return nil
@@ -69,7 +73,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
includeLastFile := false
if !isDeletingBucket {
for {
- entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
+ entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "")
if err != nil {
glog.Errorf("list folder %s: %v", entry.FullPath, err)
return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
@@ -77,7 +81,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop
glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
}
for _, sub := range entries {
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index c461a82b8..7ab101102 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -116,13 +116,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "")
+ dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "", "")
if listDayErr != nil {
return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "")
+ hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index 295a5039e..a91faeb24 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -11,6 +11,28 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
+ f.maybeReloadFilerConfiguration(event)
+ f.onBucketEvents(event)
+}
+
+func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
+ message := event.EventNotification
+ for _, sig := range message.Signatures {
+ if sig == f.Signature {
+ return
+ }
+ }
+ if f.DirBucketsPath == event.Directory {
+ if message.OldEntry == nil && message.NewEntry != nil {
+ f.Store.OnBucketCreation(message.NewEntry.Name)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ f.Store.OnBucketDeletion(message.OldEntry.Name)
+ }
+ }
+}
+
+func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) {
if DirectoryEtcSeaweedFS != event.Directory {
if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath {
return
@@ -26,12 +48,11 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse)
if entry.Name == FilerConfName {
f.reloadFilerConfiguration(entry)
}
-
}
func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
var buf bytes.Buffer
- err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64)
+ err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64, false)
if err != nil {
return nil, err
}
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 0a14d3756..f43312cfa 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -20,9 +20,9 @@ func splitPattern(pattern string) (prefix string, restPattern string) {
}
// For now, prefix and namePattern are mutually exclusive
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string) (entries []*Entry, hasMore bool, err error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string) (entries []*Entry, hasMore bool, err error) {
- _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, func(entry *Entry) bool {
+ _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
entries = append(entries, entry)
return true
})
@@ -36,7 +36,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
}
// For now, prefix and namePattern are mutually exclusive
-func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
+func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
@@ -47,30 +47,38 @@ func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath,
}
var missedCount int64
- missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, eachEntryFunc)
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
for missedCount > 0 && err == nil {
- missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, eachEntryFunc)
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, namePatternExclude, eachEntryFunc)
}
return
}
-func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
+func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
- if len(restNamePattern) == 0 {
+ if len(restNamePattern) == 0 && len(namePatternExclude) == 0{
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
return 0, lastFileName, err
}
lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
- nameToTest := strings.ToLower(entry.Name())
- if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched {
- if !eachEntryFunc(entry) {
- return false
+ nameToTest := entry.Name()
+ if len(namePatternExclude) > 0 {
+ if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched {
+ missedCount++
+ return true
}
- } else {
- missedCount++
+ }
+ if len(restNamePattern) > 0 {
+ if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched {
+ missedCount++
+ return true
+ }
+ }
+ if !eachEntryFunc(entry) {
+ return false
}
return true
})
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 8955a25c7..a5b2f25de 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -39,3 +39,8 @@ type FilerStore interface {
Shutdown()
}
+
+type BucketAware interface {
+ OnBucketCreation(bucket string)
+ OnBucketDeletion(bucket string)
+}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 64baac371..cd7c0bea3 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -21,6 +21,8 @@ type VirtualFilerStore interface {
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
AddPathSpecificStore(path string, storeId string, store FilerStore)
+ OnBucketCreation(bucket string)
+ OnBucketDeletion(bucket string)
}
type FilerStoreWrapper struct {
@@ -40,6 +42,27 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
}
}
+func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
+ for _, store := range fsw.storeIdToStore {
+ if ba, ok := store.(BucketAware); ok {
+ ba.OnBucketCreation(bucket)
+ }
+ }
+ if ba, ok := fsw.defaultStore.(BucketAware); ok {
+ ba.OnBucketCreation(bucket)
+ }
+}
+func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
+ for _, store := range fsw.storeIdToStore {
+ if ba, ok := store.(BucketAware); ok {
+ ba.OnBucketDeletion(bucket)
+ }
+ }
+ if ba, ok := fsw.defaultStore.(BucketAware); ok {
+ ba.OnBucketDeletion(bucket)
+ }
+}
+
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
err := fsw.pathToStore.Put([]byte(path), storeId)
@@ -126,8 +149,8 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
- glog.V(4).Infof("FindEntry %s", fp)
entry, err = actualStore.FindEntry(ctx, fp)
+ glog.V(4).Infof("FindEntry %s: %v", fp, err)
if err != nil {
return nil, err
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index ac0ad4ba6..ce454f36a 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -8,6 +8,7 @@ import (
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+ "os"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -38,6 +39,7 @@ func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, pre
func (store *LevelDBStore) initialize(dir string) (err error) {
glog.Infof("filer store dir: %s", dir)
+ os.MkdirAll(dir, 0755)
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index 9c342605e..d437895f5 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -51,14 +51,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -77,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index c76340da4..124d61c1c 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -38,6 +38,7 @@ func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, pr
func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
glog.Infof("filer store leveldb2 dir: %s", dir)
+ os.MkdirAll(dir, 0755)
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index 495c73fdd..fd0ad18a3 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index c81ea7bbe..d1cdfbbf6 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -45,6 +45,7 @@ func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, pr
func (store *LevelDB3Store) initialize(dir string) (err error) {
glog.Infof("filer store leveldb3 dir: %s", dir)
+ os.MkdirAll(dir, 0755)
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go
index 53b0e927f..0b970a539 100644
--- a/weed/filer/leveldb3/leveldb3_store_test.go
+++ b/weed/filer/leveldb3/leveldb3_store_test.go
@@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 9437e9992..5c368a57e 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -69,6 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
peerSignature, err = ma.readFilerStoreSignature(peer)
}
+ // when filer store is not shared by multiple filers
if peerSignature != f.Signature {
if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
lastTsNs = prevTsNs
diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go
index 4213cf965..93d3e3f9e 100644
--- a/weed/filer/mysql/mysql_sql_gen.go
+++ b/weed/filer/mysql/mysql_sql_gen.go
@@ -2,6 +2,7 @@ package mysql
import (
"fmt"
+
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
_ "github.com/go-sql-driver/mysql"
)
@@ -9,44 +10,49 @@ import (
type SqlGenMysql struct {
CreateTableSqlTemplate string
DropTableSqlTemplate string
+ UpsertQueryTemplate string
}
var (
_ = abstract_sql.SqlGenerator(&SqlGenMysql{})
)
-func (gen *SqlGenMysql) GetSqlInsert(bucket string) string {
- return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
+func (gen *SqlGenMysql) GetSqlInsert(tableName string) string {
+ if gen.UpsertQueryTemplate != "" {
+ return fmt.Sprintf(gen.UpsertQueryTemplate, tableName)
+ } else {
+ return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", tableName)
+ }
}
-func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string {
- return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
+func (gen *SqlGenMysql) GetSqlUpdate(tableName string) string {
+ return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", tableName)
}
-func (gen *SqlGenMysql) GetSqlFind(bucket string) string {
- return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket)
+func (gen *SqlGenMysql) GetSqlFind(tableName string) string {
+ return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", tableName)
}
-func (gen *SqlGenMysql) GetSqlDelete(bucket string) string {
- return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket)
+func (gen *SqlGenMysql) GetSqlDelete(tableName string) string {
+ return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", tableName)
}
-func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string {
- return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", bucket)
+func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(tableName string) string {
+ return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", tableName)
}
-func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string {
- return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+func (gen *SqlGenMysql) GetSqlListExclusive(tableName string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", tableName)
}
-func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string {
- return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+func (gen *SqlGenMysql) GetSqlListInclusive(tableName string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", tableName)
}
-func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string {
- return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket)
+func (gen *SqlGenMysql) GetSqlCreateTable(tableName string) string {
+ return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
}
-func (gen *SqlGenMysql) GetSqlDropTable(bucket string) string {
- return fmt.Sprintf(gen.DropTableSqlTemplate, bucket)
+func (gen *SqlGenMysql) GetSqlDropTable(tableName string) string {
+ return fmt.Sprintf(gen.DropTableSqlTemplate, tableName)
}
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
index 501ab1d39..fbaa4d5f9 100644
--- a/weed/filer/mysql/mysql_store.go
+++ b/weed/filer/mysql/mysql_store.go
@@ -3,9 +3,10 @@ package mysql
import (
"database/sql"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/util"
_ "github.com/go-sql-driver/mysql"
@@ -29,6 +30,8 @@ func (store *MysqlStore) GetName() string {
func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
+ configuration.GetString(prefix+"upsertQuery"),
+ configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
@@ -41,13 +44,17 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
)
}
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen,
+func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen,
maxLifetimeSeconds int, interpolateParams bool) (err error) {
store.SupportBucketTable = false
+ if !enableUpsert {
+ upsertQuery = ""
+ }
store.SqlGenerator = &SqlGenMysql{
CreateTableSqlTemplate: "",
DropTableSqlTemplate: "drop table `%s`",
+ UpsertQueryTemplate: upsertQuery,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
index c796cd6aa..a1f54455a 100644
--- a/weed/filer/mysql2/mysql2_store.go
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -32,6 +32,8 @@ func (store *MysqlStore2) GetName() string {
func (store *MysqlStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"createTable"),
+ configuration.GetString(prefix+"upsertQuery"),
+ configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
@@ -44,13 +46,17 @@ func (store *MysqlStore2) Initialize(configuration util.Configuration, prefix st
)
}
-func (store *MysqlStore2) initialize(createTable, user, password, hostname string, port int, database string, maxIdle, maxOpen,
+func (store *MysqlStore2) initialize(createTable, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen,
maxLifetimeSeconds int, interpolateParams bool) (err error) {
store.SupportBucketTable = true
+ if !enableUpsert {
+ upsertQuery = ""
+ }
store.SqlGenerator = &mysql.SqlGenMysql{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: "drop table `%s`",
+ UpsertQueryTemplate: upsertQuery,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go
index e13070c3d..6cee3d2da 100644
--- a/weed/filer/postgres/postgres_sql_gen.go
+++ b/weed/filer/postgres/postgres_sql_gen.go
@@ -10,44 +10,49 @@ import (
type SqlGenPostgres struct {
CreateTableSqlTemplate string
DropTableSqlTemplate string
+ UpsertQueryTemplate string
}
var (
_ = abstract_sql.SqlGenerator(&SqlGenPostgres{})
)
-func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string {
- return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, bucket)
+func (gen *SqlGenPostgres) GetSqlInsert(tableName string) string {
+ if gen.UpsertQueryTemplate != "" {
+ return fmt.Sprintf(gen.UpsertQueryTemplate, tableName)
+ } else {
+ return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, tableName)
+ }
}
-func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string {
- return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, bucket)
+func (gen *SqlGenPostgres) GetSqlUpdate(tableName string) string {
+ return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, tableName)
}
-func (gen *SqlGenPostgres) GetSqlFind(bucket string) string {
- return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket)
+func (gen *SqlGenPostgres) GetSqlFind(tableName string) string {
+ return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, tableName)
}
-func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string {
- return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket)
+func (gen *SqlGenPostgres) GetSqlDelete(tableName string) string {
+ return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, tableName)
}
-func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string {
- return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, bucket)
+func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(tableName string) string {
+ return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, tableName)
}
-func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string {
- return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket)
+func (gen *SqlGenPostgres) GetSqlListExclusive(tableName string) string {
+ return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName)
}
-func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string {
- return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket)
+func (gen *SqlGenPostgres) GetSqlListInclusive(tableName string) string {
+ return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName)
}
-func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string {
- return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket)
+func (gen *SqlGenPostgres) GetSqlCreateTable(tableName string) string {
+ return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName)
}
-func (gen *SqlGenPostgres) GetSqlDropTable(bucket string) string {
- return fmt.Sprintf(gen.DropTableSqlTemplate, bucket)
+func (gen *SqlGenPostgres) GetSqlDropTable(tableName string) string {
+ return fmt.Sprintf(gen.DropTableSqlTemplate, tableName)
}
diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go
index 9e4ff7c32..a1e16a92a 100644
--- a/weed/filer/postgres/postgres_store.go
+++ b/weed/filer/postgres/postgres_store.go
@@ -29,6 +29,8 @@ func (store *PostgresStore) GetName() string {
func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
+ configuration.GetString(prefix+"upsertQuery"),
+ configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
@@ -42,12 +44,16 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix
)
}
-func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
+func (store *PostgresStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = false
+ if !enableUpsert {
+ upsertQuery = ""
+ }
store.SqlGenerator = &SqlGenPostgres{
CreateTableSqlTemplate: "",
DropTableSqlTemplate: `drop table "%s"`,
+ UpsertQueryTemplate: upsertQuery,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go
index 92893bf7a..0f573d8d0 100644
--- a/weed/filer/postgres2/postgres2_store.go
+++ b/weed/filer/postgres2/postgres2_store.go
@@ -32,6 +32,8 @@ func (store *PostgresStore2) GetName() string {
func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"createTable"),
+ configuration.GetString(prefix+"upsertQuery"),
+ configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
@@ -45,12 +47,16 @@ func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix
)
}
-func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
+func (store *PostgresStore2) initialize(createTable, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = true
+ if !enableUpsert {
+ upsertQuery = ""
+ }
store.SqlGenerator = &postgres.SqlGenPostgres{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: `drop table "%s"`,
+ UpsertQueryTemplate: upsertQuery,
}
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index 7a6da3beb..d92d526d5 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -27,7 +27,7 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed
return err
}
- return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+ return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64, false)
}
@@ -35,14 +35,18 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
- data, _, err := util.FastGet(target)
+ data, _, err := util.Get(target)
return data, err
}
func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error {
-
- target := fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name)
+ var target string
+ if port == 0 {
+ target = fmt.Sprintf("http://%s%s/%s", host, dir, name)
+ } else {
+ target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name)
+ }
// set the HTTP method, url, and request body
req, err := http.NewRequest(http.MethodPut, target, byteBuffer)
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index 70c301725..379a18c62 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -8,6 +8,7 @@ import (
"crypto/md5"
"fmt"
"io"
+ "os"
"github.com/tecbot/gorocksdb"
@@ -56,6 +57,7 @@ func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, pre
func (store *RocksDBStore) initialize(dir string) (err error) {
glog.Infof("filer store rocksdb dir: %s", dir)
+ os.MkdirAll(dir, 0755)
if err := weed_util.TestFolderWritable(dir); err != nil {
return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
}
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
index 439663524..f6e755b4b 100644
--- a/weed/filer/rocksdb/rocksdb_store_test.go
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -53,14 +53,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -79,7 +79,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 6a87a2b7d..661a210ea 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "golang.org/x/sync/errgroup"
"io"
"math"
"strings"
@@ -13,9 +14,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, isCheck bool) error {
- // fmt.Printf("start to stream content for chunks: %+v\n", chunks)
+ glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -26,19 +27,33 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
+ } else if len(urlStrings) == 0 {
+ glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
}
fileId2Url[chunkView.FileId] = urlStrings
}
+ if isCheck {
+ // Pre-check all chunkViews urls
+ gErr := new(errgroup.Group)
+ CheckAllChunkViews(chunkViews, &fileId2Url, gErr)
+ if err := gErr.Wait(); err != nil {
+ glog.Errorf("check all chunks: %v", err)
+ return fmt.Errorf("check all chunks: %v", err)
+ }
+ return nil
+ }
+
for _, chunkView := range chunkViews {
urlStrings := fileId2Url[chunkView.FileId]
-
data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
if err != nil {
glog.Errorf("read chunk: %v", err)
return fmt.Errorf("read chunk: %v", err)
}
+
_, err = w.Write(data)
if err != nil {
glog.Errorf("write chunk: %v", err)
@@ -50,6 +65,17 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
}
+func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) {
+ for _, chunkView := range chunkViews {
+ urlStrings := (*fileId2Url)[chunkView.FileId]
+ glog.V(9).Infof("Check chunk: %+v\n url: %v", chunkView, urlStrings)
+ gErr.Go(func() error {
+ _, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ return err
+ })
+ }
+}
+
// ---------------- ReadAllReader ----------------------------------
func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
@@ -181,7 +207,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {