diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:15:17 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-04-26 17:15:17 +0800 |
| commit | 4ee58922eff61a5a4ca29c0b4829b097a498549e (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/filer | |
| parent | 5cdff56731d6be220d4d6e8624b859520dfeb021 (diff) | |
| parent | 86185262bb86e31f9a2f71e85d02df2502c7ad40 (diff) | |
| download | seaweedfs-4ee58922eff61a5a4ca29c0b4829b097a498549e.tar.xz seaweedfs-4ee58922eff61a5a4ca29c0b4829b097a498549e.zip | |
Merge pull request #6 from chrislusf/master
Diffstat (limited to 'weed/filer')
28 files changed, 262 insertions, 101 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 07ce56145..ab8f6bcbd 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -13,15 +13,15 @@ import ( ) type SqlGenerator interface { - GetSqlInsert(bucket string) string - GetSqlUpdate(bucket string) string - GetSqlFind(bucket string) string - GetSqlDelete(bucket string) string - GetSqlDeleteFolderChildren(bucket string) string - GetSqlListExclusive(bucket string) string - GetSqlListInclusive(bucket string) string - GetSqlCreateTable(bucket string) string - GetSqlDropTable(bucket string) string + GetSqlInsert(tableName string) string + GetSqlUpdate(tableName string) string + GetSqlFind(tableName string) string + GetSqlDelete(tableName string) string + GetSqlDeleteFolderChildren(tableName string) string + GetSqlListExclusive(tableName string) string + GetSqlListInclusive(tableName string) string + GetSqlCreateTable(tableName string) string + GetSqlDropTable(tableName string) string } type AbstractSqlStore struct { @@ -32,6 +32,29 @@ type AbstractSqlStore struct { dbsLock sync.Mutex } +func (store *AbstractSqlStore) OnBucketCreation(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + store.CreateTable(context.Background(), bucket) + + if store.dbs == nil { + return + } + store.dbs[bucket] = true +} +func (store *AbstractSqlStore) OnBucketDeletion(bucket string) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + store.deleteTable(context.Background(), bucket) + + if store.dbs == nil { + return + } + delete(store.dbs, bucket) +} + const ( DEFAULT_TABLE = "filemeta" ) @@ -253,7 +276,9 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat } } - res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath) + glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) + + res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) if err != nil { return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 99a62c90c..c709dc819 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -97,20 +97,20 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { var err error - var buffer bytes.Buffer var shouldRetry bool + receivedData := make([]byte, 0, size) for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { - buffer.Write(data) + receivedData = receivedData[:0] + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + receivedData = append(receivedData, data...) }) if !shouldRetry { break } if err != nil { glog.V(0).Infof("read %s failed, err: %v", urlString, err) - buffer.Reset() } else { break } @@ -123,7 +123,8 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool } } - return buffer.Bytes(), err + return receivedData, err + } func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index e59887763..effdc0e4e 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -241,12 +241,12 @@ func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err er if oldEntry != nil { entry.Attr.Crtime = oldEntry.Attr.Crtime if oldEntry.IsDirectory() && !entry.IsDirectory() { - glog.Errorf("existing %s is a directory", entry.FullPath) - return fmt.Errorf("existing %s is a directory", entry.FullPath) + glog.Errorf("existing %s is a directory", oldEntry.FullPath) + return fmt.Errorf("existing %s is a directory", oldEntry.FullPath) } if !oldEntry.IsDirectory() && entry.IsDirectory() { - glog.Errorf("existing %s is a file", entry.FullPath) - return fmt.Errorf("existing %s is a file", entry.FullPath) + glog.Errorf("existing %s is a file", oldEntry.FullPath) + return fmt.Errorf("existing %s is a file", oldEntry.FullPath) } } return f.Store.UpdateEntry(ctx, entry) diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go index ba170f02e..43fb000c9 100644 --- a/weed/filer/filer_buckets.go +++ b/weed/filer/filer_buckets.go @@ -29,7 +29,7 @@ func (f *Filer) LoadBuckets() { limit := int64(math.MaxInt32) - entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "") + entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "", "") if err != nil { glog.V(1).Infof("no buckets found: %v", err) diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index 8e549f5ad..ab5afc5cc 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -18,6 +18,7 @@ const ( FilerConfName = "filer.conf" IamConfigDirecotry = "/etc/iam" IamIdentityFile = "identity.json" + IamPoliciesFile = "policies.json" ) type FilerConf struct { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 50a669f40..3ef3cfff9 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -11,6 +11,10 @@ import ( type HardLinkId []byte +const ( + MsgFailDelNonEmptyFolder = "fail to delete non-empty folder" +) + func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { if p == "/" { return nil @@ -69,7 +73,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry includeLastFile := false if !isDeletingBucket { for { - entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "") + entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "") if err != nil { glog.Errorf("list folder %s: %v", entry.FullPath, err) return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) @@ -77,7 +81,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath) } for _, sub := range entries { diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index c461a82b8..7ab101102 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -116,13 +116,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( sizeBuf := make([]byte, 4) startTsNs := startTime.UnixNano() - dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "") + dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "", "") if listDayErr != nil { return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr) } for _, dayEntry := range dayEntries { // println("checking day", dayEntry.FullPath) - hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "") + hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "") if listHourMinuteErr != nil { return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr) } diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index 295a5039e..a91faeb24 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -11,6 +11,28 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { + f.maybeReloadFilerConfiguration(event) + f.onBucketEvents(event) +} + +func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { + message := event.EventNotification + for _, sig := range message.Signatures { + if sig == f.Signature { + return + } + } + if f.DirBucketsPath == event.Directory { + if message.OldEntry == nil && message.NewEntry != nil { + f.Store.OnBucketCreation(message.NewEntry.Name) + } + if message.OldEntry != nil && message.NewEntry == nil { + f.Store.OnBucketDeletion(message.OldEntry.Name) + } + } +} + +func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) { if DirectoryEtcSeaweedFS != event.Directory { if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath { return @@ -26,12 +48,11 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) if entry.Name == FilerConfName { f.reloadFilerConfiguration(entry) } - } func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) { var buf bytes.Buffer - err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64) + err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64, false) if err != nil { return nil, err } diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 0a14d3756..f43312cfa 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -20,9 +20,9 @@ func splitPattern(pattern string) (prefix string, restPattern string) { } // For now, prefix and namePattern are mutually exclusive -func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string) (entries []*Entry, hasMore bool, err error) { +func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string) (entries []*Entry, hasMore bool, err error) { - _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, func(entry *Entry) bool { + _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool { entries = append(entries, entry) return true }) @@ -36,7 +36,7 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start } // For now, prefix and namePattern are mutually exclusive -func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { +func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { p = p[0 : len(p)-1] } @@ -47,30 +47,38 @@ func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, } var missedCount int64 - missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, eachEntryFunc) + missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, namePatternExclude, eachEntryFunc) for missedCount > 0 && err == nil { - missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, eachEntryFunc) + missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, namePatternExclude, eachEntryFunc) } return } -func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) { +func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) { - if len(restNamePattern) == 0 { + if len(restNamePattern) == 0 && len(namePatternExclude) == 0{ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc) return 0, lastFileName, err } lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { - nameToTest := strings.ToLower(entry.Name()) - if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched { - if !eachEntryFunc(entry) { - return false + nameToTest := entry.Name() + if len(namePatternExclude) > 0 { + if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched { + missedCount++ + return true } - } else { - missedCount++ + } + if len(restNamePattern) > 0 { + if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched { + missedCount++ + return true + } + } + if !eachEntryFunc(entry) { + return false } return true }) diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 8955a25c7..a5b2f25de 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -39,3 +39,8 @@ type FilerStore interface { Shutdown() } + +type BucketAware interface { + OnBucketCreation(bucket string) + OnBucketDeletion(bucket string) +} diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 64baac371..cd7c0bea3 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -21,6 +21,8 @@ type VirtualFilerStore interface { DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error DeleteOneEntry(ctx context.Context, entry *Entry) error AddPathSpecificStore(path string, storeId string, store FilerStore) + OnBucketCreation(bucket string) + OnBucketDeletion(bucket string) } type FilerStoreWrapper struct { @@ -40,6 +42,27 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { } } +func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) { + for _, store := range fsw.storeIdToStore { + if ba, ok := store.(BucketAware); ok { + ba.OnBucketCreation(bucket) + } + } + if ba, ok := fsw.defaultStore.(BucketAware); ok { + ba.OnBucketCreation(bucket) + } +} +func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) { + for _, store := range fsw.storeIdToStore { + if ba, ok := store.(BucketAware); ok { + ba.OnBucketDeletion(bucket) + } + } + if ba, ok := fsw.defaultStore.(BucketAware); ok { + ba.OnBucketDeletion(bucket) + } +} + func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) { fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store) err := fsw.pathToStore.Put([]byte(path), storeId) @@ -126,8 +149,8 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "find").Observe(time.Since(start).Seconds()) }() - glog.V(4).Infof("FindEntry %s", fp) entry, err = actualStore.FindEntry(ctx, fp) + glog.V(4).Infof("FindEntry %s: %v", fp, err) if err != nil { return nil, err } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index ac0ad4ba6..ce454f36a 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -8,6 +8,7 @@ import ( leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + "os" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -38,6 +39,7 @@ func (store *LevelDBStore) Initialize(configuration weed_util.Configuration, pre func (store *LevelDBStore) initialize(dir string) (err error) { glog.Infof("filer store dir: %s", dir) + os.MkdirAll(dir, 0755) if err := weed_util.TestFolderWritable(dir); err != nil { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index 9c342605e..d437895f5 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -51,14 +51,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") + entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -77,7 +77,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index c76340da4..124d61c1c 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -38,6 +38,7 @@ func (store *LevelDB2Store) Initialize(configuration weed_util.Configuration, pr func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { glog.Infof("filer store leveldb2 dir: %s", dir) + os.MkdirAll(dir, 0755) if err := weed_util.TestFolderWritable(dir); err != nil { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go index 495c73fdd..fd0ad18a3 100644 --- a/weed/filer/leveldb2/leveldb2_store_test.go +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") + entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index c81ea7bbe..d1cdfbbf6 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -45,6 +45,7 @@ func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, pr func (store *LevelDB3Store) initialize(dir string) (err error) { glog.Infof("filer store leveldb3 dir: %s", dir) + os.MkdirAll(dir, 0755) if err := weed_util.TestFolderWritable(dir); err != nil { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go index 53b0e927f..0b970a539 100644 --- a/weed/filer/leveldb3/leveldb3_store_test.go +++ b/weed/filer/leveldb3/leveldb3_store_test.go @@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") + entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 9437e9992..5c368a57e 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -69,6 +69,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string peerSignature, err = ma.readFilerStoreSignature(peer) } + // when filer store is not shared by multiple filers if peerSignature != f.Signature { if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil { lastTsNs = prevTsNs diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go index 4213cf965..93d3e3f9e 100644 --- a/weed/filer/mysql/mysql_sql_gen.go +++ b/weed/filer/mysql/mysql_sql_gen.go @@ -2,6 +2,7 @@ package mysql import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" _ "github.com/go-sql-driver/mysql" ) @@ -9,44 +10,49 @@ import ( type SqlGenMysql struct { CreateTableSqlTemplate string DropTableSqlTemplate string + UpsertQueryTemplate string } var ( _ = abstract_sql.SqlGenerator(&SqlGenMysql{}) ) -func (gen *SqlGenMysql) GetSqlInsert(bucket string) string { - return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket) +func (gen *SqlGenMysql) GetSqlInsert(tableName string) string { + if gen.UpsertQueryTemplate != "" { + return fmt.Sprintf(gen.UpsertQueryTemplate, tableName) + } else { + return fmt.Sprintf("INSERT INTO `%s` (dirhash,name,directory,meta) VALUES(?,?,?,?)", tableName) + } } -func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string { - return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket) +func (gen *SqlGenMysql) GetSqlUpdate(tableName string) string { + return fmt.Sprintf("UPDATE `%s` SET meta=? WHERE dirhash=? AND name=? AND directory=?", tableName) } -func (gen *SqlGenMysql) GetSqlFind(bucket string) string { - return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket) +func (gen *SqlGenMysql) GetSqlFind(tableName string) string { + return fmt.Sprintf("SELECT meta FROM `%s` WHERE dirhash=? AND name=? AND directory=?", tableName) } -func (gen *SqlGenMysql) GetSqlDelete(bucket string) string { - return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", bucket) +func (gen *SqlGenMysql) GetSqlDelete(tableName string) string { + return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND name=? AND directory=?", tableName) } -func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string { - return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", bucket) +func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(tableName string) string { + return fmt.Sprintf("DELETE FROM `%s` WHERE dirhash=? AND directory=?", tableName) } -func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string { - return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket) +func (gen *SqlGenMysql) GetSqlListExclusive(tableName string) string { + return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", tableName) } -func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string { - return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket) +func (gen *SqlGenMysql) GetSqlListInclusive(tableName string) string { + return fmt.Sprintf("SELECT NAME, meta FROM `%s` WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", tableName) } -func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string { - return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket) +func (gen *SqlGenMysql) GetSqlCreateTable(tableName string) string { + return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName) } -func (gen *SqlGenMysql) GetSqlDropTable(bucket string) string { - return fmt.Sprintf(gen.DropTableSqlTemplate, bucket) +func (gen *SqlGenMysql) GetSqlDropTable(tableName string) string { + return fmt.Sprintf(gen.DropTableSqlTemplate, tableName) } diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go index 501ab1d39..fbaa4d5f9 100644 --- a/weed/filer/mysql/mysql_store.go +++ b/weed/filer/mysql/mysql_store.go @@ -3,9 +3,10 @@ package mysql import ( "database/sql" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "time" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql" "github.com/chrislusf/seaweedfs/weed/util" _ "github.com/go-sql-driver/mysql" @@ -29,6 +30,8 @@ func (store *MysqlStore) GetName() string { func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( + configuration.GetString(prefix+"upsertQuery"), + configuration.GetBool(prefix+"enableUpsert"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetString(prefix+"hostname"), @@ -41,13 +44,17 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str ) } -func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen, +func (store *MysqlStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen, maxLifetimeSeconds int, interpolateParams bool) (err error) { store.SupportBucketTable = false + if !enableUpsert { + upsertQuery = "" + } store.SqlGenerator = &SqlGenMysql{ CreateTableSqlTemplate: "", DropTableSqlTemplate: "drop table `%s`", + UpsertQueryTemplate: upsertQuery, } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go index c796cd6aa..a1f54455a 100644 --- a/weed/filer/mysql2/mysql2_store.go +++ b/weed/filer/mysql2/mysql2_store.go @@ -32,6 +32,8 @@ func (store *MysqlStore2) GetName() string { func (store *MysqlStore2) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( configuration.GetString(prefix+"createTable"), + configuration.GetString(prefix+"upsertQuery"), + configuration.GetBool(prefix+"enableUpsert"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetString(prefix+"hostname"), @@ -44,13 +46,17 @@ func (store *MysqlStore2) Initialize(configuration util.Configuration, prefix st ) } -func (store *MysqlStore2) initialize(createTable, user, password, hostname string, port int, database string, maxIdle, maxOpen, +func (store *MysqlStore2) initialize(createTable, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen, maxLifetimeSeconds int, interpolateParams bool) (err error) { store.SupportBucketTable = true + if !enableUpsert { + upsertQuery = "" + } store.SqlGenerator = &mysql.SqlGenMysql{ CreateTableSqlTemplate: createTable, DropTableSqlTemplate: "drop table `%s`", + UpsertQueryTemplate: upsertQuery, } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go index e13070c3d..6cee3d2da 100644 --- a/weed/filer/postgres/postgres_sql_gen.go +++ b/weed/filer/postgres/postgres_sql_gen.go @@ -10,44 +10,49 @@ import ( type SqlGenPostgres struct { CreateTableSqlTemplate string DropTableSqlTemplate string + UpsertQueryTemplate string } var ( _ = abstract_sql.SqlGenerator(&SqlGenPostgres{}) ) -func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string { - return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, bucket) +func (gen *SqlGenPostgres) GetSqlInsert(tableName string) string { + if gen.UpsertQueryTemplate != "" { + return fmt.Sprintf(gen.UpsertQueryTemplate, tableName) + } else { + return fmt.Sprintf(`INSERT INTO "%s" (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)`, tableName) + } } -func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string { - return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, bucket) +func (gen *SqlGenPostgres) GetSqlUpdate(tableName string) string { + return fmt.Sprintf(`UPDATE "%s" SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4`, tableName) } -func (gen *SqlGenPostgres) GetSqlFind(bucket string) string { - return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket) +func (gen *SqlGenPostgres) GetSqlFind(tableName string) string { + return fmt.Sprintf(`SELECT meta FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, tableName) } -func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string { - return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, bucket) +func (gen *SqlGenPostgres) GetSqlDelete(tableName string) string { + return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND name=$2 AND directory=$3`, tableName) } -func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string { - return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, bucket) +func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(tableName string) string { + return fmt.Sprintf(`DELETE FROM "%s" WHERE dirhash=$1 AND directory=$2`, tableName) } -func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string { - return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket) +func (gen *SqlGenPostgres) GetSqlListExclusive(tableName string) string { + return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName) } -func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string { - return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, bucket) +func (gen *SqlGenPostgres) GetSqlListInclusive(tableName string) string { + return fmt.Sprintf(`SELECT NAME, meta FROM "%s" WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5`, tableName) } -func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string { - return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket) +func (gen *SqlGenPostgres) GetSqlCreateTable(tableName string) string { + return fmt.Sprintf(gen.CreateTableSqlTemplate, tableName) } -func (gen *SqlGenPostgres) GetSqlDropTable(bucket string) string { - return fmt.Sprintf(gen.DropTableSqlTemplate, bucket) +func (gen *SqlGenPostgres) GetSqlDropTable(tableName string) string { + return fmt.Sprintf(gen.DropTableSqlTemplate, tableName) } diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go index 9e4ff7c32..a1e16a92a 100644 --- a/weed/filer/postgres/postgres_store.go +++ b/weed/filer/postgres/postgres_store.go @@ -29,6 +29,8 @@ func (store *PostgresStore) GetName() string { func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( + configuration.GetString(prefix+"upsertQuery"), + configuration.GetBool(prefix+"enableUpsert"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetString(prefix+"hostname"), @@ -42,12 +44,16 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix ) } -func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { +func (store *PostgresStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { store.SupportBucketTable = false + if !enableUpsert { + upsertQuery = "" + } store.SqlGenerator = &SqlGenPostgres{ CreateTableSqlTemplate: "", DropTableSqlTemplate: `drop table "%s"`, + UpsertQueryTemplate: upsertQuery, } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go index 92893bf7a..0f573d8d0 100644 --- a/weed/filer/postgres2/postgres2_store.go +++ b/weed/filer/postgres2/postgres2_store.go @@ -32,6 +32,8 @@ func (store *PostgresStore2) GetName() string { func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( configuration.GetString(prefix+"createTable"), + configuration.GetString(prefix+"upsertQuery"), + configuration.GetBool(prefix+"enableUpsert"), configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetString(prefix+"hostname"), @@ -45,12 +47,16 @@ func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix ) } -func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { +func (store *PostgresStore2) initialize(createTable, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) { store.SupportBucketTable = true + if !enableUpsert { + upsertQuery = "" + } store.SqlGenerator = &postgres.SqlGenPostgres{ CreateTableSqlTemplate: createTable, DropTableSqlTemplate: `drop table "%s"`, + UpsertQueryTemplate: upsertQuery, } sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode) diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index 7a6da3beb..d92d526d5 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -27,7 +27,7 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed return err } - return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64, false) } @@ -35,14 +35,18 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) { target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name) - data, _, err := util.FastGet(target) + data, _, err := util.Get(target) return data, err } func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error { - - target := fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name) + var target string + if port == 0 { + target = fmt.Sprintf("http://%s%s/%s", host, dir, name) + } else { + target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name) + } // set the HTTP method, url, and request body req, err := http.NewRequest(http.MethodPut, target, byteBuffer) diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 70c301725..379a18c62 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -8,6 +8,7 @@ import ( "crypto/md5" "fmt" "io" + "os" "github.com/tecbot/gorocksdb" @@ -56,6 +57,7 @@ func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, pre func (store *RocksDBStore) initialize(dir string) (err error) { glog.Infof("filer store rocksdb dir: %s", dir) + os.MkdirAll(dir, 0755) if err := weed_util.TestFolderWritable(dir); err != nil { return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err) } diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go index 439663524..f6e755b4b 100644 --- a/weed/filer/rocksdb/rocksdb_store_test.go +++ b/weed/filer/rocksdb/rocksdb_store_test.go @@ -53,14 +53,14 @@ func TestCreateAndFind(t *testing.T) { } // checking one upper directory - entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "") + entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return } // checking one upper directory - entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if len(entries) != 1 { t.Errorf("list entries count: %v", len(entries)) return @@ -79,7 +79,7 @@ func TestEmptyRoot(t *testing.T) { ctx := context.Background() // checking one upper directory - entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "") + entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "", "") if err != nil { t.Errorf("list entries: %v", err) return diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 6a87a2b7d..661a210ea 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,6 +3,7 @@ package filer import ( "bytes" "fmt" + "golang.org/x/sync/errgroup" "io" "math" "strings" @@ -13,9 +14,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) -func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { +func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, isCheck bool) error { - // fmt.Printf("start to stream content for chunks: %+v\n", chunks) + glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -26,19 +27,33 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err + } else if len(urlStrings) == 0 { + glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) + return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) } fileId2Url[chunkView.FileId] = urlStrings } + if isCheck { + // Pre-check all chunkViews urls + gErr := new(errgroup.Group) + CheckAllChunkViews(chunkViews, &fileId2Url, gErr) + if err := gErr.Wait(); err != nil { + glog.Errorf("check all chunks: %v", err) + return fmt.Errorf("check all chunks: %v", err) + } + return nil + } + for _, chunkView := range chunkViews { urlStrings := fileId2Url[chunkView.FileId] - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) if err != nil { glog.Errorf("read chunk: %v", err) return fmt.Errorf("read chunk: %v", err) } + _, err = w.Write(data) if err != nil { glog.Errorf("write chunk: %v", err) @@ -50,6 +65,17 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c } +func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) { + for _, chunkView := range chunkViews { + urlStrings := (*fileId2Url)[chunkView.FileId] + glog.V(9).Infof("Check chunk: %+v\n url: %v", chunkView, urlStrings) + gErr.Go(func() error { + _, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + return err + }) + } +} + // ---------------- ReadAllReader ---------------------------------- func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { @@ -181,7 +207,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { |
