aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-21 23:49:51 -0700
committerchrislu <chris.lu@gmail.com>2022-07-21 23:49:51 -0700
commit35261c805efe56a05ae524b798e0158719cf7a04 (patch)
tree563e07fbe996a9ed54e600ffb306f22552d628d6 /weed/filer
parent99739fa5a9c20174a1739e51d3a8b4c4d2ebd3b1 (diff)
parent7a6c559ab4a6b696bb574454b297ebefabec29ed (diff)
downloadseaweedfs-35261c805efe56a05ae524b798e0158719cf7a04.tar.xz
seaweedfs-35261c805efe56a05ae524b798e0158719cf7a04.zip
Merge branch 'master' into messaging
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go2
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go16
-rw-r--r--weed/filer/leveldb3/leveldb3_store_bucket.go23
-rw-r--r--weed/filer/meta_aggregator.go7
-rw-r--r--weed/filer/mysql2/mysql2_store.go2
-rw-r--r--weed/filer/postgres2/postgres2_store.go2
-rw-r--r--weed/filer/stream.go22
-rw-r--r--weed/filer/ydb/ydb_store.go2
8 files changed, 67 insertions, 9 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 13268b944..a159d5272 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -32,6 +32,8 @@ type AbstractSqlStore struct {
dbsLock sync.Mutex
}
+var _ filer.BucketAware = (*AbstractSqlStore)(nil)
+
func (store *AbstractSqlStore) CanDropWholeBucket() bool {
return store.SupportBucketTable
}
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index d21515bd4..8da4a9e7f 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -121,23 +121,31 @@ func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bo
}
store.dbsLock.RUnlock()
- // upgrade to write lock
+
+ db, err := store.createDB(bucket)
+
+ return db, bucket, shortPath, err
+}
+
+func (store *LevelDB3Store) createDB(bucket string) (*leveldb.DB, error) {
+
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
// double check after getting the write lock
if db, found := store.dbs[bucket]; found {
- return db, bucket, shortPath, nil
+ return db, nil
}
// create db
db, err := store.loadDB(bucket)
if err != nil {
- return nil, bucket, shortPath, err
+ return nil, err
}
+
store.dbs[bucket] = db
- return db, bucket, shortPath, nil
+ return db, nil
}
func (store *LevelDB3Store) closeDB(bucket string) {
diff --git a/weed/filer/leveldb3/leveldb3_store_bucket.go b/weed/filer/leveldb3/leveldb3_store_bucket.go
new file mode 100644
index 000000000..823fe363b
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store_bucket.go
@@ -0,0 +1,23 @@
+package leveldb
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "os"
+)
+
+var _ filer.BucketAware = (*LevelDB3Store)(nil)
+
+func (store *LevelDB3Store) OnBucketCreation(bucket string) {
+ store.createDB(bucket)
+}
+
+func (store *LevelDB3Store) OnBucketDeletion(bucket string) {
+ store.closeDB(bucket)
+ if bucket != "" { // just to make sure
+ os.RemoveAll(store.dir + "/" + bucket)
+ }
+}
+
+func (store *LevelDB3Store) CanDropWholeBucket() bool {
+ return true
+}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index c672ce342..5799e247e 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
+ "strings"
"sync"
"time"
@@ -99,7 +100,11 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres
return
}
if err != nil {
- glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
+ errLvl := glog.Level(0)
+ if strings.Contains(err.Error(), "duplicated local subscription detected") {
+ errLvl = glog.Level(1)
+ }
+ glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
}
if lastTsNs < nextLastTsNs {
lastTsNs = nextLastTsNs
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
index e50480150..792c79e44 100644
--- a/weed/filer/mysql2/mysql2_store.go
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -18,6 +18,8 @@ const (
CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
)
+var _ filer.BucketAware = (*MysqlStore2)(nil)
+
func init() {
filer.Stores = append(filer.Stores, &MysqlStore2{})
}
diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go
index 0f573d8d0..3c57e4cb4 100644
--- a/weed/filer/postgres2/postgres2_store.go
+++ b/weed/filer/postgres2/postgres2_store.go
@@ -17,6 +17,8 @@ const (
CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
)
+var _ filer.BucketAware = (*PostgresStore2)(nil)
+
func init() {
filer.Stores = append(filer.Stores, &PostgresStore2{})
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 7da9fd0a0..d1b66e88d 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -18,6 +18,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
+var getLookupFileIdBackoffSchedule = []time.Duration{
+ 150 * time.Millisecond,
+ 600 * time.Millisecond,
+ 1800 * time.Millisecond,
+}
+
func HasData(entry *filer_pb.Entry) bool {
if len(entry.Content) > 0 {
@@ -69,14 +75,22 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
-
- urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ var urlStrings []string
+ var err error
+ for _, backoff := range getLookupFileIdBackoffSchedule {
+ urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ if err == nil && len(urlStrings) > 0 {
+ time.Sleep(backoff)
+ break
+ }
+ }
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
} else if len(urlStrings) == 0 {
- glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
- return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ glog.Error(errUrlNotFound)
+ return errUrlNotFound
}
fileId2Url[chunkView.FileId] = urlStrings
}
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
index 1e3a55a09..d5751bb5a 100644
--- a/weed/filer/ydb/ydb_store.go
+++ b/weed/filer/ydb/ydb_store.go
@@ -320,6 +320,8 @@ func (store *YdbStore) Shutdown() {
_ = store.DB.Close(context.Background())
}
+var _ filer.BucketAware = (*YdbStore)(nil)
+
func (store *YdbStore) CanDropWholeBucket() bool {
return store.SupportBucketTable
}