aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go5
-rw-r--r--weed/filer/cassandra/cassandra_store.go11
-rw-r--r--weed/filer/entry.go77
-rw-r--r--weed/filer/entry_codec.go23
-rw-r--r--weed/filer/filechunk_manifest.go11
-rw-r--r--weed/filer/filechunks.go12
-rw-r--r--weed/filer/filechunks_test.go4
-rw-r--r--weed/filer/filer.go6
-rw-r--r--weed/filer/filer_buckets.go4
-rw-r--r--weed/filer/filer_conf.go4
-rw-r--r--weed/filer/filer_conf_test.go15
-rw-r--r--weed/filer/filer_delete_entry.go2
-rw-r--r--weed/filer/filer_on_meta_event.go22
-rw-r--r--weed/filer/filer_remote_storage.go197
-rw-r--r--weed/filer/filer_remote_storage_test.go34
-rw-r--r--weed/filer/filer_search.go5
-rw-r--r--weed/filer/filerstore.go1
-rw-r--r--weed/filer/filerstore_wrapper.go8
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go7
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go14
-rw-r--r--weed/filer/meta_aggregator.go3
-rw-r--r--weed/filer/mongodb/mongodb_store_kv.go11
-rw-r--r--weed/filer/read_remote.go45
-rw-r--r--weed/filer/read_write.go53
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go8
-rw-r--r--weed/filer/s3iam_conf.go3
-rw-r--r--weed/filer/s3iam_conf_test.go2
-rw-r--r--weed/filer/sqlite/sqlite_store_unsupported.go2
-rw-r--r--weed/filer/stream.go113
29 files changed, 539 insertions, 163 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index ab8f6bcbd..4bf9b16fa 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -32,6 +32,9 @@ type AbstractSqlStore struct {
dbsLock sync.Mutex
}
+func (store *AbstractSqlStore) CanDropWholeBucket() bool {
+ return store.SupportBucketTable
+}
func (store *AbstractSqlStore) OnBucketCreation(bucket string) {
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
@@ -277,7 +280,6 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
}
glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)))
-
res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath))
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
@@ -287,7 +289,6 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
if err != nil {
return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
}
-
return nil
}
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index fd2ce91a6..fc0b52ac7 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -32,6 +32,7 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
+ configuration.GetString(prefix+"localDC"),
)
}
@@ -40,13 +41,19 @@ func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string,
return
}
-func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
+func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string) (err error) {
store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
}
store.cluster.Keyspace = keyspace
+ fallback := gocql.RoundRobinHostPolicy()
+ if localDC != "" {
+ fallback = gocql.DCAwareRoundRobinPolicy(localDC)
+ }
+ store.cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback)
store.cluster.Consistency = gocql.LocalQuorum
+
store.session, err = store.cluster.CreateSession()
if err != nil {
glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
@@ -117,7 +124,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
var data []byte
if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
- dir, name).Consistency(gocql.One).Scan(&data); err != nil {
+ dir, name).Scan(&data); err != nil {
if err != gocql.ErrNotFound {
return nil, filer_pb.ErrNotFound
}
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index b7c8370e6..8fa75fe6b 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -42,6 +42,7 @@ type Entry struct {
HardLinkId HardLinkId
HardLinkCounter int32
Content []byte
+ Remote *filer_pb.RemoteEntry
}
func (entry *Entry) Size() uint64 {
@@ -56,20 +57,55 @@ func (entry *Entry) Timestamp() time.Time {
}
}
+func (entry *Entry) ShallowClone() *Entry {
+ if entry == nil {
+ return nil
+ }
+ newEntry := &Entry{}
+ newEntry.FullPath = entry.FullPath
+ newEntry.Attr = entry.Attr
+ newEntry.Chunks = entry.Chunks
+ newEntry.Extended = entry.Extended
+ newEntry.HardLinkId = entry.HardLinkId
+ newEntry.HardLinkCounter = entry.HardLinkCounter
+ newEntry.Content = entry.Content
+ newEntry.Remote = entry.Remote
+
+ return newEntry
+}
+
func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
if entry == nil {
return nil
}
- return &filer_pb.Entry{
- Name: entry.FullPath.Name(),
- IsDirectory: entry.IsDirectory(),
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
- HardLinkCounter: entry.HardLinkCounter,
- Content: entry.Content,
+ message := &filer_pb.Entry{}
+ message.Name = entry.FullPath.Name()
+ entry.ToExistingProtoEntry(message)
+ return message
+}
+
+func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) {
+ if entry == nil {
+ return
}
+ message.IsDirectory = entry.IsDirectory()
+ message.Attributes = EntryAttributeToPb(entry)
+ message.Chunks = entry.Chunks
+ message.Extended = entry.Extended
+ message.HardLinkId = entry.HardLinkId
+ message.HardLinkCounter = entry.HardLinkCounter
+ message.Content = entry.Content
+ message.RemoteEntry = entry.Remote
+}
+
+func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
+ fsEntry.Attr = PbToEntryAttribute(message.Attributes)
+ fsEntry.Chunks = message.Chunks
+ fsEntry.Extended = message.Extended
+ fsEntry.HardLinkId = HardLinkId(message.HardLinkId)
+ fsEntry.HardLinkCounter = message.HardLinkCounter
+ fsEntry.Content = message.Content
+ fsEntry.Remote = message.RemoteEntry
}
func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
@@ -83,26 +119,11 @@ func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
}
}
-func (entry *Entry) Clone() *Entry {
- return &Entry{
- FullPath: entry.FullPath,
- Attr: entry.Attr,
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
- HardLinkCounter: entry.HardLinkCounter,
- }
-}
-
func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
- return &Entry{
- FullPath: util.NewFullPath(dir, entry.Name),
- Attr: PbToEntryAttribute(entry.Attributes),
- Chunks: entry.Chunks,
- HardLinkId: HardLinkId(entry.HardLinkId),
- HardLinkCounter: entry.HardLinkCounter,
- Content: entry.Content,
- }
+ t := &Entry{}
+ t.FullPath = util.NewFullPath(dir, entry.Name)
+ FromPbEntryToExistingEntry(entry, t)
+ return t
}
func maxUint64(x, y uint64) uint64 {
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
index 4c613f068..55c937b39 100644
--- a/weed/filer/entry_codec.go
+++ b/weed/filer/entry_codec.go
@@ -12,14 +12,8 @@ import (
)
func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
- message := &filer_pb.Entry{
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
- HardLinkCounter: entry.HardLinkCounter,
- Content: entry.Content,
- }
+ message := &filer_pb.Entry{}
+ entry.ToExistingProtoEntry(message)
return proto.Marshal(message)
}
@@ -31,15 +25,7 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err)
}
- entry.Attr = PbToEntryAttribute(message.Attributes)
-
- entry.Extended = message.Extended
-
- entry.Chunks = message.Chunks
-
- entry.HardLinkId = message.HardLinkId
- entry.HardLinkCounter = message.HardLinkCounter
- entry.Content = message.Content
+ FromPbEntryToExistingEntry(message, entry)
return nil
}
@@ -129,6 +115,9 @@ func EqualEntry(a, b *Entry) bool {
if !bytes.Equal(a.Content, b.Content) {
return false
}
+ if !proto.Equal(a.Remote, b.Remote) {
+ return false
+ }
return true
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index c709dc819..0c6b0cfe3 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -16,7 +16,7 @@ import (
)
const (
- ManifestBatch = 1000
+ ManifestBatch = 10000
)
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
@@ -39,9 +39,14 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
+
+ if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
+ continue
+ }
+
if !chunk.IsChunkManifest {
dataChunks = append(dataChunks, chunk)
continue
@@ -54,7 +59,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
manifestChunks = append(manifestChunks, chunk)
// recursive
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
+ dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
return chunks, nil, subErr
}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 346eb3cfb..0dc03f6e2 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -53,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
fileIds := make(map[string]bool)
for _, interval := range visibles {
@@ -72,11 +72,11 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
- aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
+ aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64)
if aErr != nil {
return nil, aErr
}
- bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs)
+ bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64)
if bErr != nil {
return nil, bErr
}
@@ -117,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
return ViewFromVisibleIntervals(visibles, offset, size)
@@ -221,9 +221,9 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
- chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
+ chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
sort.Slice(chunks, func(i, j int) bool {
if chunks[i].Mtime == chunks[j].Mtime {
diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go
index 699e7e298..b0ea20848 100644
--- a/weed/filer/filechunks_test.go
+++ b/weed/filer/filechunks_test.go
@@ -90,7 +90,7 @@ func TestRandomFileChunksCompact(t *testing.T) {
}
}
- visibles, _ := NonOverlappingVisibleIntervals(nil, chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64)
for _, v := range visibles {
for x := v.start; x < v.stop; x++ {
@@ -227,7 +227,7 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
- intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks)
+ intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64)
for x, interval := range intervals {
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
i, x, interval.start, interval.stop, interval.fileId)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index effdc0e4e..1a20abefc 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -42,6 +42,7 @@ type Filer struct {
MetaAggregator *MetaAggregator
Signature int32
FilerConf *FilerConf
+ RemoteStorage *FilerRemoteStorage
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption,
@@ -51,8 +52,9 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
+ RemoteStorage: NewFilerRemoteStorage(),
}
- f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
+ f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
@@ -207,7 +209,7 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
Attr: Attr{
Mtime: now,
Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0110,
+ Mode: os.ModeDir | entry.Mode | 0111,
Uid: entry.Uid,
Gid: entry.Gid,
Collection: entry.Collection,
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
index 43fb000c9..38a1abadb 100644
--- a/weed/filer/filer_buckets.go
+++ b/weed/filer/filer_buckets.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"math"
+ "strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -78,6 +79,9 @@ func (f *Filer) isBucket(entry *Entry) bool {
if parent != f.DirBucketsPath {
return false
}
+ if strings.HasPrefix(dirName, ".") {
+ return false
+ }
f.buckets.RLock()
defer f.buckets.RUnlock()
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index ab5afc5cc..c58b26dc2 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -15,6 +15,7 @@ import (
const (
DirectoryEtcRoot = "/etc"
DirectoryEtcSeaweedFS = "/etc/seaweedfs"
+ DirectoryEtcRemote = "/etc/remote"
FilerConfName = "filer.conf"
IamConfigDirecotry = "/etc/iam"
IamIdentityFile = "identity.json"
@@ -126,6 +127,9 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
if b.VolumeGrowthCount > 0 {
a.VolumeGrowthCount = b.VolumeGrowthCount
}
+ if b.ReadOnly {
+ a.ReadOnly = b.ReadOnly
+ }
}
func (fc *FilerConf) ToProto() *filer_pb.FilerConf {
diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go
index ff868a3ec..1576c7d82 100644
--- a/weed/filer/filer_conf_test.go
+++ b/weed/filer/filer_conf_test.go
@@ -24,6 +24,18 @@ func TestFilerConf(t *testing.T) {
LocationPrefix: "/buckets/",
Replication: "001",
},
+ {
+ LocationPrefix: "/buckets",
+ ReadOnly: false,
+ },
+ {
+ LocationPrefix: "/buckets/xxx",
+ ReadOnly: true,
+ },
+ {
+ LocationPrefix: "/buckets/xxx/yyy",
+ ReadOnly: false,
+ },
}}
fc.doLoadConf(conf)
@@ -31,4 +43,7 @@ func TestFilerConf(t *testing.T) {
assert.Equal(t, "abcd", fc.MatchStorageRule("/buckets/abcd/jasdf").Collection)
assert.Equal(t, "001", fc.MatchStorageRule("/buckets/abc/jasdf").Replication)
+ assert.Equal(t, true, fc.MatchStorageRule("/buckets/xxx/yyy/zzz").ReadOnly)
+ assert.Equal(t, false, fc.MatchStorageRule("/buckets/other").ReadOnly)
+
}
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 3ef3cfff9..35187d034 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -71,7 +71,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName := ""
includeLastFile := false
- if !isDeletingBucket {
+ if !isDeletingBucket || !f.Store.CanDropWholeBucket() {
for {
entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "")
if err != nil {
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index c9f75a5ca..34ac5321a 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -12,6 +12,7 @@ import (
// onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
f.maybeReloadFilerConfiguration(event)
+ f.maybeReloadRemoteStorageConfigurationAndMapping(event)
f.onBucketEvents(event)
}
@@ -24,10 +25,14 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
}
if f.DirBucketsPath == event.Directory {
if message.OldEntry == nil && message.NewEntry != nil {
- f.Store.OnBucketCreation(message.NewEntry.Name)
+ if message.NewEntry.IsDirectory {
+ f.Store.OnBucketCreation(message.NewEntry.Name)
+ }
}
if message.OldEntry != nil && message.NewEntry == nil {
- f.Store.OnBucketDeletion(message.OldEntry.Name)
+ if message.OldEntry.IsDirectory {
+ f.Store.OnBucketDeletion(message.OldEntry.Name)
+ }
}
}
}
@@ -80,3 +85,16 @@ func (f *Filer) LoadFilerConf() {
}
f.FilerConf = fc
}
+
+////////////////////////////////////
+// load and maintain remote storages
+////////////////////////////////////
+func (f *Filer) LoadRemoteStorageConfAndMapping() {
+ if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil {
+ glog.Errorf("read remote conf and mapping: %v", err)
+ return
+ }
+}
+func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) {
+ // FIXME add reloading
+}
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go
new file mode 100644
index 000000000..f09658015
--- /dev/null
+++ b/weed/filer/filer_remote_storage.go
@@ -0,0 +1,197 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+ "math"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/viant/ptrie"
+)
+
+const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
+const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
+
+type FilerRemoteStorage struct {
+ rules ptrie.Trie
+ storageNameToConf map[string]*filer_pb.RemoteConf
+}
+
+func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
+ rs = &FilerRemoteStorage{
+ rules: ptrie.New(),
+ storageNameToConf: make(map[string]*filer_pb.RemoteConf),
+ }
+ return rs
+}
+
+func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
+ // execute this on filer
+
+ entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "")
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil
+ }
+ glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
+ return
+ }
+
+ for _, entry := range entries {
+ if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
+ if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
+ return err
+ }
+ continue
+ }
+ if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
+ return nil
+ }
+ conf := &filer_pb.RemoteConf{}
+ if err := proto.Unmarshal(entry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
+ }
+ rs.storageNameToConf[conf.Name] = conf
+ }
+ return nil
+}
+
+func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
+ mappings := &filer_pb.RemoteStorageMapping{}
+ if err := proto.Unmarshal(data, mappings); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
+ }
+ for dir, storageLocation := range mappings.Mappings {
+ rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
+ }
+ return nil
+}
+
+func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) {
+ rs.rules.Put([]byte(dir+"/"), loc)
+}
+
+func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) {
+ rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
+ mountDir = util.FullPath(string(key[:len(key)-1]))
+ remoteLocation = value.(*filer_pb.RemoteStorageLocation)
+ return true
+ })
+ return
+}
+
+func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+ var storageLocation *filer_pb.RemoteStorageLocation
+ rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
+ storageLocation = value.(*filer_pb.RemoteStorageLocation)
+ return true
+ })
+
+ if storageLocation == nil {
+ found = false
+ return
+ }
+
+ return rs.GetRemoteStorageClient(storageLocation.Name)
+}
+
+func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+ remoteConf, found = rs.storageNameToConf[storageName]
+ if !found {
+ return
+ }
+
+ var err error
+ if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
+ found = true
+ return
+ }
+ return
+}
+
+func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) {
+ mappings = &filer_pb.RemoteStorageMapping{
+ Mappings: make(map[string]*filer_pb.RemoteStorageLocation),
+ }
+ if len(oldContent) > 0 {
+ if err = proto.Unmarshal(oldContent, mappings); err != nil {
+ glog.Warningf("unmarshal existing mappings: %v", err)
+ }
+ }
+ return
+}
+
+func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ // skip
+ }
+
+ // set the new mapping
+ mappings.Mappings[dir] = storageLocation
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
+
+func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ return nil, unmarshalErr
+ }
+
+ // set the new mapping
+ delete(mappings.Mappings, dir)
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
+
+func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
+ if readErr != nil {
+ return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+
+ return
+}
+
+func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ // unmarshal storage configuration
+ conf = &filer_pb.RemoteConf{}
+ if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
+ readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
+ return
+ }
+
+ return
+}
diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go
new file mode 100644
index 000000000..35ffc7538
--- /dev/null
+++ b/weed/filer/filer_remote_storage_test.go
@@ -0,0 +1,34 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) {
+ conf := &filer_pb.RemoteConf{
+ Name: "s7",
+ Type: "s3",
+ }
+ rs := NewFilerRemoteStorage()
+ rs.storageNameToConf[conf.Name] = conf
+
+ rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{
+ Name: "s7",
+ Bucket: "some",
+ Path: "/dir",
+ })
+
+ _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f")
+ assert.Equal(t, true, found, "find storage client")
+
+ _, _, found2 := rs.FindRemoteStorageClient("/a/b")
+ assert.Equal(t, false, found2, "should not find storage client")
+
+ _, _, found3 := rs.FindRemoteStorageClient("/a/b/c")
+ assert.Equal(t, false, found3, "should not find storage client")
+
+ _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc")
+ assert.Equal(t, false, found4, "should not find storage client")
+}
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 2ee29be25..2e0336da8 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"github.com/chrislusf/seaweedfs/weed/util"
+ "math"
"path/filepath"
"strings"
)
@@ -27,6 +28,10 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
return true
})
+ if limit == math.MaxInt64 {
+ limit = math.MaxInt64 - 1
+ }
+
hasMore = int64(len(entries)) >= limit+1
if hasMore {
entries = entries[:limit]
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index a5b2f25de..38927d6fb 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -43,4 +43,5 @@ type FilerStore interface {
type BucketAware interface {
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
+ CanDropWholeBucket() bool
}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 5175a87a1..2470f340c 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -23,6 +23,7 @@ type VirtualFilerStore interface {
AddPathSpecificStore(path string, storeId string, store FilerStore)
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
+ CanDropWholeBucket() bool
}
type FilerStoreWrapper struct {
@@ -42,6 +43,13 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
}
}
+func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool {
+ if ba, ok := fsw.defaultStore.(BucketAware); ok {
+ return ba.CanDropWholeBucket()
+ }
+ return false
+}
+
func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) {
for _, store := range fsw.storeIdToStore {
if ba, ok := store.(BucketAware); ok {
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 124d61c1c..4c4409c4d 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -5,12 +5,14 @@ import (
"context"
"crypto/md5"
"fmt"
+ "io"
+ "os"
+
"github.com/syndtr/goleveldb/leveldb"
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
- "io"
- "os"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -47,6 +49,7 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 4,
+ Filter: filter.NewBloomFilter(8), // false positive rate 0.02
}
for d := 0; d < dbCount; d++ {
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index d1cdfbbf6..bc57a6605 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -5,15 +5,17 @@ import (
"context"
"crypto/md5"
"fmt"
- "github.com/syndtr/goleveldb/leveldb"
- leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/opt"
- leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
"io"
"os"
"strings"
"sync"
+ "github.com/syndtr/goleveldb/leveldb"
+ leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/filter"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -62,17 +64,19 @@ func (store *LevelDB3Store) initialize(dir string) (err error) {
}
func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
-
+ bloom := filter.NewBloomFilter(8) // false positive rate 0.02
opts := &opt.Options{
BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 4,
+ Filter: bloom,
}
if name != DEFAULT {
opts = &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 4,
+ Filter: bloom,
}
}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 241e99a1a..913cbd454 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -34,7 +34,7 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg
grpcDialOption: grpcDialOption,
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
- t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() {
+ t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
t.ListenersCond.Broadcast()
})
return t
@@ -118,6 +118,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
}
for {
+ glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
diff --git a/weed/filer/mongodb/mongodb_store_kv.go b/weed/filer/mongodb/mongodb_store_kv.go
index 4aa9c3a33..59b8f1d93 100644
--- a/weed/filer/mongodb/mongodb_store_kv.go
+++ b/weed/filer/mongodb/mongodb_store_kv.go
@@ -7,6 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
+ "go.mongodb.org/mongo-driver/mongo/options"
)
func (store *MongodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
@@ -15,11 +16,11 @@ func (store *MongodbStore) KvPut(ctx context.Context, key []byte, value []byte)
c := store.connect.Database(store.database).Collection(store.collectionName)
- _, err = c.InsertOne(ctx, Model{
- Directory: dir,
- Name: name,
- Meta: value,
- })
+ opts := options.Update().SetUpsert(true)
+ filter := bson.D{{"directory", dir}, {"name", name}}
+ update := bson.D{{"$set", bson.D{{"meta", value}}}}
+
+ _, err = c.UpdateOne(ctx, filter, update, opts)
if err != nil {
return fmt.Errorf("kv put: %v", err)
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
new file mode 100644
index 000000000..77ca81f15
--- /dev/null
+++ b/weed/filer/read_remote.go
@@ -0,0 +1,45 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (entry *Entry) IsInRemoteOnly() bool {
+ return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.RemoteSize > 0
+}
+
+func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data []byte, err error) {
+ client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName)
+ if !found {
+ return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
+ }
+
+ mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath)
+
+ sourceLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoation, entry.FullPath)
+
+ return client.ReadFile(sourceLoc, offset, size)
+}
+
+func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, fp util.FullPath) *filer_pb.RemoteStorageLocation {
+ remoteLocation := &filer_pb.RemoteStorageLocation{
+ Name: remoteMountedLocation.Name,
+ Bucket: remoteMountedLocation.Bucket,
+ Path: remoteMountedLocation.Path,
+ }
+ remoteLocation.Path += string(fp)[len(localMountedDir):]
+ return remoteLocation
+}
+
+func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *filer_pb.RemoteConf, remoteLocation *filer_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
+ return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{
+ Directory: string(parent),
+ Name: entry.Name,
+ })
+ return err
+ })
+}
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index c4c90fb63..14e8cab1e 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -2,13 +2,9 @@ package filer
import (
"bytes"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "io/ioutil"
"math"
- "net/http"
"time"
)
@@ -31,50 +27,17 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed
}
-func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
-
- target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
-
- data, _, err := util.Get(target)
-
- return data, err
-}
-
-func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error {
- var target string
- if port == 0 {
- target = fmt.Sprintf("http://%s%s/%s", host, dir, name)
- } else {
- target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name)
- }
-
- // set the HTTP method, url, and request body
- req, err := http.NewRequest(http.MethodPut, target, byteBuffer)
- if err != nil {
- return err
- }
-
- // set the request header Content-Type for json
- if contentType != "" {
- req.Header.Set("Content-Type", contentType)
- }
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return err
+func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) {
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
}
- defer util.CloseResponse(resp)
-
- b, err := ioutil.ReadAll(resp.Body)
+ respLookupEntry, err := filer_pb.LookupEntry(filerClient, request)
if err != nil {
- return err
- }
-
- if resp.StatusCode >= 400 {
- return fmt.Errorf("%s: %s %v", target, resp.Status, string(b))
+ return
}
-
- return nil
-
+ content = respLookupEntry.Entry.Content
+ return
}
func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error {
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index 379a18c62..729da7d9b 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -24,18 +24,21 @@ func init() {
type options struct {
opt *gorocksdb.Options
+ bto *gorocksdb.BlockBasedTableOptions
ro *gorocksdb.ReadOptions
wo *gorocksdb.WriteOptions
}
func (opt *options) init() {
opt.opt = gorocksdb.NewDefaultOptions()
+ opt.bto = gorocksdb.NewDefaultBlockBasedTableOptions()
opt.ro = gorocksdb.NewDefaultReadOptions()
opt.wo = gorocksdb.NewDefaultWriteOptions()
}
func (opt *options) close() {
opt.opt.Destroy()
+ opt.bto.Destroy()
opt.ro.Destroy()
opt.wo.Destroy()
}
@@ -69,6 +72,11 @@ func (store *RocksDBStore) initialize(dir string) (err error) {
store.opt.SetCompactionFilter(NewTTLFilter())
// store.opt.SetMaxBackgroundCompactions(2)
+ // https://github.com/tecbot/gorocksdb/issues/132
+ store.bto.SetFilterPolicy(gorocksdb.NewBloomFilterFull(8))
+ store.opt.SetBlockBasedTableFactory(store.bto)
+ // store.opt.EnableStatistics()
+
store.db, err = gorocksdb.OpenDb(store.opt, dir)
return
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
index 92387fb09..55c976915 100644
--- a/weed/filer/s3iam_conf.go
+++ b/weed/filer/s3iam_conf.go
@@ -4,6 +4,7 @@ import (
"bytes"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
"io"
)
@@ -14,7 +15,7 @@ func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfigura
return nil
}
-func S3ConfigurationToText(writer io.Writer, config *iam_pb.S3ApiConfiguration) error {
+func ProtoToText(writer io.Writer, config proto.Message) error {
m := jsonpb.Marshaler{
EmitDefaults: false,
diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go
index 65cc49840..da7d9c9f1 100644
--- a/weed/filer/s3iam_conf_test.go
+++ b/weed/filer/s3iam_conf_test.go
@@ -44,7 +44,7 @@ func TestS3Conf(t *testing.T) {
},
}
var buf bytes.Buffer
- err := S3ConfigurationToText(&buf, s3Conf)
+ err := ProtoToText(&buf, s3Conf)
assert.Equal(t, err, nil)
s3ConfSaved := &iam_pb.S3ApiConfiguration{}
err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved)
diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go
index 9b7009df9..803c71afa 100644
--- a/weed/filer/sqlite/sqlite_store_unsupported.go
+++ b/weed/filer/sqlite/sqlite_store_unsupported.go
@@ -1,4 +1,4 @@
-// +build !linux,!darwin,!windows
+// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64
// limited GOOS due to modernc.org/libc/unistd
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 70a278ca5..503e6b23f 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math"
+ "sort"
"strings"
"time"
@@ -88,56 +89,77 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
// ---------------- ChunkStreamReader ----------------------------------
type ChunkStreamReader struct {
- chunkViews []*ChunkView
- logicOffset int64
- buffer []byte
- bufferOffset int64
- bufferPos int
- chunkIndex int
- lookupFileId wdclient.LookupFileIdFunctionType
+ chunkViews []*ChunkView
+ totalSize int64
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferPos int
+ nextChunkViewIndex int
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
+var _ = io.ReaderAt(&ChunkStreamReader{})
-func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
-
- lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
- return masterClient.LookupFileId(fileId)
- }
+func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ sort.Slice(chunkViews, func(i, j int) bool {
+ return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
+ })
+
+ var totalSize int64
+ for _, chunk := range chunkViews {
+ totalSize += int64(chunk.Size)
+ }
return &ChunkStreamReader{
chunkViews: chunkViews,
lookupFileId: lookupFileIdFn,
+ totalSize: totalSize,
}
}
+func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ return doNewChunkStreamReader(lookupFileIdFn, chunks)
+}
+
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
lookupFileIdFn := LookupFn(filerClient)
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ return doNewChunkStreamReader(lookupFileIdFn, chunks)
+}
- return &ChunkStreamReader{
- chunkViews: chunkViews,
- lookupFileId: lookupFileIdFn,
+func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
+ if err = c.prepareBufferFor(c.logicOffset); err != nil {
+ return
}
+ return c.Read(p)
}
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
for n < len(p) {
if c.isBufferEmpty() {
- if c.chunkIndex >= len(c.chunkViews) {
+ if c.nextChunkViewIndex >= len(c.chunkViews) {
return n, io.EOF
}
- chunkView := c.chunkViews[c.chunkIndex]
- c.fetchChunkToBuffer(chunkView)
- c.chunkIndex++
+ chunkView := c.chunkViews[c.nextChunkViewIndex]
+ if err = c.fetchChunkToBuffer(chunkView); err != nil {
+ return
+ }
+ c.nextChunkViewIndex++
}
t := copy(p[n:], c.buffer[c.bufferPos:])
c.bufferPos += t
n += t
+ c.logicOffset += int64(t)
}
return
}
@@ -148,36 +170,53 @@ func (c *ChunkStreamReader) isBufferEmpty() bool {
func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
- var totalSize int64
- for _, chunk := range c.chunkViews {
- totalSize += int64(chunk.Size)
- }
-
var err error
switch whence {
case io.SeekStart:
case io.SeekCurrent:
- offset += c.bufferOffset + int64(c.bufferPos)
+ offset += c.logicOffset
case io.SeekEnd:
- offset = totalSize + offset
+ offset = c.totalSize + offset
}
- if offset > totalSize {
+ if offset > c.totalSize {
err = io.ErrUnexpectedEOF
+ } else {
+ c.logicOffset = offset
}
- for i, chunk := range c.chunkViews {
- if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
- if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
- c.fetchChunkToBuffer(chunk)
- c.chunkIndex = i + 1
- break
- }
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
+ // stay in the same chunk
+ if !c.isBufferEmpty() {
+ if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
+ c.bufferPos = int(offset - c.bufferOffset)
+ return nil
}
}
- c.bufferPos = int(offset - c.bufferOffset)
- return offset, err
+ // need to seek to a different chunk
+ currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
+ return c.chunkViews[i].LogicOffset <= offset
+ })
+ if currentChunkIndex == len(c.chunkViews) {
+ return io.EOF
+ }
+ // positioning within the new chunk
+ chunk := c.chunkViews[currentChunkIndex]
+ if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
+ if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
+ if err = c.fetchChunkToBuffer(chunk); err != nil {
+ return
+ }
+ c.nextChunkViewIndex = currentChunkIndex + 1
+ }
+ c.bufferPos = int(offset - c.bufferOffset)
+ }
+ return
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {