diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-21 23:49:51 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-21 23:49:51 -0700 |
| commit | 35261c805efe56a05ae524b798e0158719cf7a04 (patch) | |
| tree | 563e07fbe996a9ed54e600ffb306f22552d628d6 /weed/filer | |
| parent | 99739fa5a9c20174a1739e51d3a8b4c4d2ebd3b1 (diff) | |
| parent | 7a6c559ab4a6b696bb574454b297ebefabec29ed (diff) | |
| download | seaweedfs-35261c805efe56a05ae524b798e0158719cf7a04.tar.xz seaweedfs-35261c805efe56a05ae524b798e0158719cf7a04.zip | |
Merge branch 'master' into messaging
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/abstract_sql/abstract_sql_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/leveldb3/leveldb3_store.go | 16 | ||||
| -rw-r--r-- | weed/filer/leveldb3/leveldb3_store_bucket.go | 23 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 7 | ||||
| -rw-r--r-- | weed/filer/mysql2/mysql2_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/postgres2/postgres2_store.go | 2 | ||||
| -rw-r--r-- | weed/filer/stream.go | 22 | ||||
| -rw-r--r-- | weed/filer/ydb/ydb_store.go | 2 |
8 files changed, 67 insertions, 9 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 13268b944..a159d5272 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -32,6 +32,8 @@ type AbstractSqlStore struct { dbsLock sync.Mutex } +var _ filer.BucketAware = (*AbstractSqlStore)(nil) + func (store *AbstractSqlStore) CanDropWholeBucket() bool { return store.SupportBucketTable } diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index d21515bd4..8da4a9e7f 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -121,23 +121,31 @@ func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bo } store.dbsLock.RUnlock() - // upgrade to write lock + + db, err := store.createDB(bucket) + + return db, bucket, shortPath, err +} + +func (store *LevelDB3Store) createDB(bucket string) (*leveldb.DB, error) { + store.dbsLock.Lock() defer store.dbsLock.Unlock() // double check after getting the write lock if db, found := store.dbs[bucket]; found { - return db, bucket, shortPath, nil + return db, nil } // create db db, err := store.loadDB(bucket) if err != nil { - return nil, bucket, shortPath, err + return nil, err } + store.dbs[bucket] = db - return db, bucket, shortPath, nil + return db, nil } func (store *LevelDB3Store) closeDB(bucket string) { diff --git a/weed/filer/leveldb3/leveldb3_store_bucket.go b/weed/filer/leveldb3/leveldb3_store_bucket.go new file mode 100644 index 000000000..823fe363b --- /dev/null +++ b/weed/filer/leveldb3/leveldb3_store_bucket.go @@ -0,0 +1,23 @@ +package leveldb + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "os" +) + +var _ filer.BucketAware = (*LevelDB3Store)(nil) + +func (store *LevelDB3Store) OnBucketCreation(bucket string) { + store.createDB(bucket) +} + +func (store *LevelDB3Store) OnBucketDeletion(bucket string) { + store.closeDB(bucket) + if bucket != "" { // just to make sure + os.RemoveAll(store.dir + "/" + bucket) + } +} + +func (store *LevelDB3Store) CanDropWholeBucket() bool { + return true +} diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index c672ce342..5799e247e 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" + "strings" "sync" "time" @@ -99,7 +100,11 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres return } if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) + errLvl := glog.Level(0) + if strings.Contains(err.Error(), "duplicated local subscription detected") { + errLvl = glog.Level(1) + } + glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) } if lastTsNs < nextLastTsNs { lastTsNs = nextLastTsNs diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go index e50480150..792c79e44 100644 --- a/weed/filer/mysql2/mysql2_store.go +++ b/weed/filer/mysql2/mysql2_store.go @@ -18,6 +18,8 @@ const ( CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8" ) +var _ filer.BucketAware = (*MysqlStore2)(nil) + func init() { filer.Stores = append(filer.Stores, &MysqlStore2{}) } diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go index 0f573d8d0..3c57e4cb4 100644 --- a/weed/filer/postgres2/postgres2_store.go +++ b/weed/filer/postgres2/postgres2_store.go @@ -17,6 +17,8 @@ const ( CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30" ) +var _ filer.BucketAware = (*PostgresStore2)(nil) + func init() { filer.Stores = append(filer.Stores, &PostgresStore2{}) } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 7da9fd0a0..d1b66e88d 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -18,6 +18,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) +var getLookupFileIdBackoffSchedule = []time.Duration{ + 150 * time.Millisecond, + 600 * time.Millisecond, + 1800 * time.Millisecond, +} + func HasData(entry *filer_pb.Entry) bool { if len(entry.Content) > 0 { @@ -69,14 +75,22 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ fileId2Url := make(map[string][]string) for _, chunkView := range chunkViews { - - urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId) + var urlStrings []string + var err error + for _, backoff := range getLookupFileIdBackoffSchedule { + urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId) + if err == nil && len(urlStrings) > 0 { + time.Sleep(backoff) + break + } + } if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } else if len(urlStrings) == 0 { - glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) - return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) + errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) + glog.Error(errUrlNotFound) + return errUrlNotFound } fileId2Url[chunkView.FileId] = urlStrings } diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go index 1e3a55a09..d5751bb5a 100644 --- a/weed/filer/ydb/ydb_store.go +++ b/weed/filer/ydb/ydb_store.go @@ -320,6 +320,8 @@ func (store *YdbStore) Shutdown() { _ = store.DB.Close(context.Background()) } +var _ filer.BucketAware = (*YdbStore)(nil) + func (store *YdbStore) CanDropWholeBucket() bool { return store.SupportBucketTable } |
