diff options
Diffstat (limited to 'weed/filer')
29 files changed, 539 insertions, 163 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index ab8f6bcbd..4bf9b16fa 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -32,6 +32,9 @@ type AbstractSqlStore struct { dbsLock sync.Mutex } +func (store *AbstractSqlStore) CanDropWholeBucket() bool { + return store.SupportBucketTable +} func (store *AbstractSqlStore) OnBucketCreation(bucket string) { store.dbsLock.Lock() defer store.dbsLock.Unlock() @@ -277,7 +280,6 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat } glog.V(4).Infof("delete %s SQL %s %d", string(shortPath), store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath))) - res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), string(shortPath)) if err != nil { return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) @@ -287,7 +289,6 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat if err != nil { return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err) } - return nil } diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fd2ce91a6..fc0b52ac7 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -32,6 +32,7 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetStringSlice(prefix+"superLargeDirectories"), + configuration.GetString(prefix+"localDC"), ) } @@ -40,13 +41,19 @@ func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, return } -func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) { +func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string) (err error) { store.cluster = gocql.NewCluster(hosts...) if username != "" && password != "" { store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password} } store.cluster.Keyspace = keyspace + fallback := gocql.RoundRobinHostPolicy() + if localDC != "" { + fallback = gocql.DCAwareRoundRobinPolicy(localDC) + } + store.cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback) store.cluster.Consistency = gocql.LocalQuorum + store.session, err = store.cluster.CreateSession() if err != nil { glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace) @@ -117,7 +124,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa var data []byte if err := store.session.Query( "SELECT meta FROM filemeta WHERE directory=? AND name=?", - dir, name).Consistency(gocql.One).Scan(&data); err != nil { + dir, name).Scan(&data); err != nil { if err != gocql.ErrNotFound { return nil, filer_pb.ErrNotFound } diff --git a/weed/filer/entry.go b/weed/filer/entry.go index b7c8370e6..8fa75fe6b 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -42,6 +42,7 @@ type Entry struct { HardLinkId HardLinkId HardLinkCounter int32 Content []byte + Remote *filer_pb.RemoteEntry } func (entry *Entry) Size() uint64 { @@ -56,20 +57,55 @@ func (entry *Entry) Timestamp() time.Time { } } +func (entry *Entry) ShallowClone() *Entry { + if entry == nil { + return nil + } + newEntry := &Entry{} + newEntry.FullPath = entry.FullPath + newEntry.Attr = entry.Attr + newEntry.Chunks = entry.Chunks + newEntry.Extended = entry.Extended + newEntry.HardLinkId = entry.HardLinkId + newEntry.HardLinkCounter = entry.HardLinkCounter + newEntry.Content = entry.Content + newEntry.Remote = entry.Remote + + return newEntry +} + func (entry *Entry) ToProtoEntry() *filer_pb.Entry { if entry == nil { return nil } - return &filer_pb.Entry{ - Name: entry.FullPath.Name(), - IsDirectory: entry.IsDirectory(), - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, - HardLinkCounter: entry.HardLinkCounter, - Content: entry.Content, + message := &filer_pb.Entry{} + message.Name = entry.FullPath.Name() + entry.ToExistingProtoEntry(message) + return message +} + +func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) { + if entry == nil { + return } + message.IsDirectory = entry.IsDirectory() + message.Attributes = EntryAttributeToPb(entry) + message.Chunks = entry.Chunks + message.Extended = entry.Extended + message.HardLinkId = entry.HardLinkId + message.HardLinkCounter = entry.HardLinkCounter + message.Content = entry.Content + message.RemoteEntry = entry.Remote +} + +func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) { + fsEntry.Attr = PbToEntryAttribute(message.Attributes) + fsEntry.Chunks = message.Chunks + fsEntry.Extended = message.Extended + fsEntry.HardLinkId = HardLinkId(message.HardLinkId) + fsEntry.HardLinkCounter = message.HardLinkCounter + fsEntry.Content = message.Content + fsEntry.Remote = message.RemoteEntry } func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { @@ -83,26 +119,11 @@ func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { } } -func (entry *Entry) Clone() *Entry { - return &Entry{ - FullPath: entry.FullPath, - Attr: entry.Attr, - Chunks: entry.Chunks, - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, - HardLinkCounter: entry.HardLinkCounter, - } -} - func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { - return &Entry{ - FullPath: util.NewFullPath(dir, entry.Name), - Attr: PbToEntryAttribute(entry.Attributes), - Chunks: entry.Chunks, - HardLinkId: HardLinkId(entry.HardLinkId), - HardLinkCounter: entry.HardLinkCounter, - Content: entry.Content, - } + t := &Entry{} + t.FullPath = util.NewFullPath(dir, entry.Name) + FromPbEntryToExistingEntry(entry, t) + return t } func maxUint64(x, y uint64) uint64 { diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index 4c613f068..55c937b39 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -12,14 +12,8 @@ import ( ) func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { - message := &filer_pb.Entry{ - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, - HardLinkCounter: entry.HardLinkCounter, - Content: entry.Content, - } + message := &filer_pb.Entry{} + entry.ToExistingProtoEntry(message) return proto.Marshal(message) } @@ -31,15 +25,7 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err) } - entry.Attr = PbToEntryAttribute(message.Attributes) - - entry.Extended = message.Extended - - entry.Chunks = message.Chunks - - entry.HardLinkId = message.HardLinkId - entry.HardLinkCounter = message.HardLinkCounter - entry.Content = message.Content + FromPbEntryToExistingEntry(message, entry) return nil } @@ -129,6 +115,9 @@ func EqualEntry(a, b *Entry) bool { if !bytes.Equal(a.Content, b.Content) { return false } + if !proto.Equal(a.Remote, b.Remote) { + return false + } return true } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index c709dc819..0c6b0cfe3 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -16,7 +16,7 @@ import ( ) const ( - ManifestBatch = 1000 + ManifestBatch = 10000 ) func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { @@ -39,9 +39,14 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa return } -func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { +func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { // TODO maybe parallel this for _, chunk := range chunks { + + if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) { + continue + } + if !chunk.IsChunkManifest { dataChunks = append(dataChunks, chunk) continue @@ -54,7 +59,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun manifestChunks = append(manifestChunks, chunk) // recursive - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks) + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { return chunks, nil, subErr } diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 346eb3cfb..0dc03f6e2 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -53,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) { func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) { - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64) fileIds := make(map[string]bool) for _, interval := range visibles { @@ -72,11 +72,11 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) { - aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as) + aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64) if aErr != nil { return nil, aErr } - bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs) + bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64) if bErr != nil { return nil, bErr } @@ -117,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool { func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) { - visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks) + visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size) return ViewFromVisibleIntervals(visibles, offset, size) @@ -221,9 +221,9 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n // NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory // If the file chunk content is a chunk manifest -func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) { +func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) { - chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) + chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) sort.Slice(chunks, func(i, j int) bool { if chunks[i].Mtime == chunks[j].Mtime { diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go index 699e7e298..b0ea20848 100644 --- a/weed/filer/filechunks_test.go +++ b/weed/filer/filechunks_test.go @@ -90,7 +90,7 @@ func TestRandomFileChunksCompact(t *testing.T) { } } - visibles, _ := NonOverlappingVisibleIntervals(nil, chunks) + visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64) for _, v := range visibles { for x := v.start; x < v.stop; x++ { @@ -227,7 +227,7 @@ func TestIntervalMerging(t *testing.T) { for i, testcase := range testcases { log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i) - intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks) + intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64) for x, interval := range intervals { log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s", i, x, interval.start, interval.stop, interval.fileId) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index effdc0e4e..1a20abefc 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -42,6 +42,7 @@ type Filer struct { MetaAggregator *MetaAggregator Signature int32 FilerConf *FilerConf + RemoteStorage *FilerRemoteStorage } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -51,8 +52,9 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), + RemoteStorage: NewFilerRemoteStorage(), } - f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication @@ -207,7 +209,7 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di Attr: Attr{ Mtime: now, Crtime: now, - Mode: os.ModeDir | entry.Mode | 0110, + Mode: os.ModeDir | entry.Mode | 0111, Uid: entry.Uid, Gid: entry.Gid, Collection: entry.Collection, diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go index 43fb000c9..38a1abadb 100644 --- a/weed/filer/filer_buckets.go +++ b/weed/filer/filer_buckets.go @@ -3,6 +3,7 @@ package filer import ( "context" "math" + "strings" "sync" "github.com/chrislusf/seaweedfs/weed/glog" @@ -78,6 +79,9 @@ func (f *Filer) isBucket(entry *Entry) bool { if parent != f.DirBucketsPath { return false } + if strings.HasPrefix(dirName, ".") { + return false + } f.buckets.RLock() defer f.buckets.RUnlock() diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go index ab5afc5cc..c58b26dc2 100644 --- a/weed/filer/filer_conf.go +++ b/weed/filer/filer_conf.go @@ -15,6 +15,7 @@ import ( const ( DirectoryEtcRoot = "/etc" DirectoryEtcSeaweedFS = "/etc/seaweedfs" + DirectoryEtcRemote = "/etc/remote" FilerConfName = "filer.conf" IamConfigDirecotry = "/etc/iam" IamIdentityFile = "identity.json" @@ -126,6 +127,9 @@ func mergePathConf(a, b *filer_pb.FilerConf_PathConf) { if b.VolumeGrowthCount > 0 { a.VolumeGrowthCount = b.VolumeGrowthCount } + if b.ReadOnly { + a.ReadOnly = b.ReadOnly + } } func (fc *FilerConf) ToProto() *filer_pb.FilerConf { diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go index ff868a3ec..1576c7d82 100644 --- a/weed/filer/filer_conf_test.go +++ b/weed/filer/filer_conf_test.go @@ -24,6 +24,18 @@ func TestFilerConf(t *testing.T) { LocationPrefix: "/buckets/", Replication: "001", }, + { + LocationPrefix: "/buckets", + ReadOnly: false, + }, + { + LocationPrefix: "/buckets/xxx", + ReadOnly: true, + }, + { + LocationPrefix: "/buckets/xxx/yyy", + ReadOnly: false, + }, }} fc.doLoadConf(conf) @@ -31,4 +43,7 @@ func TestFilerConf(t *testing.T) { assert.Equal(t, "abcd", fc.MatchStorageRule("/buckets/abcd/jasdf").Collection) assert.Equal(t, "001", fc.MatchStorageRule("/buckets/abc/jasdf").Replication) + assert.Equal(t, true, fc.MatchStorageRule("/buckets/xxx/yyy/zzz").ReadOnly) + assert.Equal(t, false, fc.MatchStorageRule("/buckets/other").ReadOnly) + } diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 3ef3cfff9..35187d034 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -71,7 +71,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry lastFileName := "" includeLastFile := false - if !isDeletingBucket { + if !isDeletingBucket || !f.Store.CanDropWholeBucket() { for { entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "") if err != nil { diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index c9f75a5ca..34ac5321a 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -12,6 +12,7 @@ import ( // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) { f.maybeReloadFilerConfiguration(event) + f.maybeReloadRemoteStorageConfigurationAndMapping(event) f.onBucketEvents(event) } @@ -24,10 +25,14 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { } if f.DirBucketsPath == event.Directory { if message.OldEntry == nil && message.NewEntry != nil { - f.Store.OnBucketCreation(message.NewEntry.Name) + if message.NewEntry.IsDirectory { + f.Store.OnBucketCreation(message.NewEntry.Name) + } } if message.OldEntry != nil && message.NewEntry == nil { - f.Store.OnBucketDeletion(message.OldEntry.Name) + if message.OldEntry.IsDirectory { + f.Store.OnBucketDeletion(message.OldEntry.Name) + } } } } @@ -80,3 +85,16 @@ func (f *Filer) LoadFilerConf() { } f.FilerConf = fc } + +//////////////////////////////////// +// load and maintain remote storages +//////////////////////////////////// +func (f *Filer) LoadRemoteStorageConfAndMapping() { + if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil { + glog.Errorf("read remote conf and mapping: %v", err) + return + } +} +func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) { + // FIXME add reloading +} diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go new file mode 100644 index 000000000..f09658015 --- /dev/null +++ b/weed/filer/filer_remote_storage.go @@ -0,0 +1,197 @@ +package filer + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "math" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/viant/ptrie" +) + +const REMOTE_STORAGE_CONF_SUFFIX = ".conf" +const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping" + +type FilerRemoteStorage struct { + rules ptrie.Trie + storageNameToConf map[string]*filer_pb.RemoteConf +} + +func NewFilerRemoteStorage() (rs *FilerRemoteStorage) { + rs = &FilerRemoteStorage{ + rules: ptrie.New(), + storageNameToConf: make(map[string]*filer_pb.RemoteConf), + } + return rs +} + +func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) { + // execute this on filer + + entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "") + if err != nil { + if err == filer_pb.ErrNotFound { + return nil + } + glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err) + return + } + + for _, entry := range entries { + if entry.Name() == REMOTE_STORAGE_MOUNT_FILE { + if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil { + return err + } + continue + } + if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) { + return nil + } + conf := &filer_pb.RemoteConf{} + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err) + } + rs.storageNameToConf[conf.Name] = conf + } + return nil +} + +func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) { + mappings := &filer_pb.RemoteStorageMapping{} + if err := proto.Unmarshal(data, mappings); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err) + } + for dir, storageLocation := range mappings.Mappings { + rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation) + } + return nil +} + +func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) { + rs.rules.Put([]byte(dir+"/"), loc) +} + +func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) { + rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { + mountDir = util.FullPath(string(key[:len(key)-1])) + remoteLocation = value.(*filer_pb.RemoteStorageLocation) + return true + }) + return +} + +func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { + var storageLocation *filer_pb.RemoteStorageLocation + rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool { + storageLocation = value.(*filer_pb.RemoteStorageLocation) + return true + }) + + if storageLocation == nil { + found = false + return + } + + return rs.GetRemoteStorageClient(storageLocation.Name) +} + +func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) { + remoteConf, found = rs.storageNameToConf[storageName] + if !found { + return + } + + var err error + if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil { + found = true + return + } + return +} + +func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) { + mappings = &filer_pb.RemoteStorageMapping{ + Mappings: make(map[string]*filer_pb.RemoteStorageLocation), + } + if len(oldContent) > 0 { + if err = proto.Unmarshal(oldContent, mappings); err != nil { + glog.Warningf("unmarshal existing mappings: %v", err) + } + } + return +} + +func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + // skip + } + + // set the new mapping + mappings.Mappings[dir] = storageLocation + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} + +func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) { + mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent) + if unmarshalErr != nil { + return nil, unmarshalErr + } + + // set the new mapping + delete(mappings.Mappings, dir) + + if newContent, err = proto.Marshal(mappings); err != nil { + return oldContent, fmt.Errorf("marshal mappings: %v", err) + } + + return +} + +func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE) + return readErr + }); readErr != nil { + return nil, readErr + } + + mappings, readErr = UnmarshalRemoteStorageMappings(oldContent) + if readErr != nil { + return nil, fmt.Errorf("unmarshal mappings: %v", readErr) + } + + return +} + +func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) { + var oldContent []byte + if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX) + return readErr + }); readErr != nil { + return nil, readErr + } + + // unmarshal storage configuration + conf = &filer_pb.RemoteConf{} + if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil { + readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) + return + } + + return +} diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/filer_remote_storage_test.go new file mode 100644 index 000000000..35ffc7538 --- /dev/null +++ b/weed/filer/filer_remote_storage_test.go @@ -0,0 +1,34 @@ +package filer + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { + conf := &filer_pb.RemoteConf{ + Name: "s7", + Type: "s3", + } + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + + rs.mapDirectoryToRemoteStorage("/a/b/c", &filer_pb.RemoteStorageLocation{ + Name: "s7", + Bucket: "some", + Path: "/dir", + }) + + _, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f") + assert.Equal(t, true, found, "find storage client") + + _, _, found2 := rs.FindRemoteStorageClient("/a/b") + assert.Equal(t, false, found2, "should not find storage client") + + _, _, found3 := rs.FindRemoteStorageClient("/a/b/c") + assert.Equal(t, false, found3, "should not find storage client") + + _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc") + assert.Equal(t, false, found4, "should not find storage client") +} diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 2ee29be25..2e0336da8 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -3,6 +3,7 @@ package filer import ( "context" "github.com/chrislusf/seaweedfs/weed/util" + "math" "path/filepath" "strings" ) @@ -27,6 +28,10 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start return true }) + if limit == math.MaxInt64 { + limit = math.MaxInt64 - 1 + } + hasMore = int64(len(entries)) >= limit+1 if hasMore { entries = entries[:limit] diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index a5b2f25de..38927d6fb 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -43,4 +43,5 @@ type FilerStore interface { type BucketAware interface { OnBucketCreation(bucket string) OnBucketDeletion(bucket string) + CanDropWholeBucket() bool } diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index 5175a87a1..2470f340c 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -23,6 +23,7 @@ type VirtualFilerStore interface { AddPathSpecificStore(path string, storeId string, store FilerStore) OnBucketCreation(bucket string) OnBucketDeletion(bucket string) + CanDropWholeBucket() bool } type FilerStoreWrapper struct { @@ -42,6 +43,13 @@ func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper { } } +func (fsw *FilerStoreWrapper) CanDropWholeBucket() bool { + if ba, ok := fsw.defaultStore.(BucketAware); ok { + return ba.CanDropWholeBucket() + } + return false +} + func (fsw *FilerStoreWrapper) OnBucketCreation(bucket string) { for _, store := range fsw.storeIdToStore { if ba, ok := store.(BucketAware); ok { diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 124d61c1c..4c4409c4d 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -5,12 +5,14 @@ import ( "context" "crypto/md5" "fmt" + "io" + "os" + "github.com/syndtr/goleveldb/leveldb" leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" leveldb_util "github.com/syndtr/goleveldb/leveldb/util" - "io" - "os" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -47,6 +49,7 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) { BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 4, + Filter: filter.NewBloomFilter(8), // false positive rate 0.02 } for d := 0; d < dbCount; d++ { diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go index d1cdfbbf6..bc57a6605 100644 --- a/weed/filer/leveldb3/leveldb3_store.go +++ b/weed/filer/leveldb3/leveldb3_store.go @@ -5,15 +5,17 @@ import ( "context" "crypto/md5" "fmt" - "github.com/syndtr/goleveldb/leveldb" - leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/opt" - leveldb_util "github.com/syndtr/goleveldb/leveldb/util" "io" "os" "strings" "sync" + "github.com/syndtr/goleveldb/leveldb" + leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" + leveldb_util "github.com/syndtr/goleveldb/leveldb/util" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -62,17 +64,19 @@ func (store *LevelDB3Store) initialize(dir string) (err error) { } func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) { - + bloom := filter.NewBloomFilter(8) // false positive rate 0.02 opts := &opt.Options{ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 4, + Filter: bloom, } if name != DEFAULT { opts = &opt.Options{ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 4, + Filter: bloom, } } diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 241e99a1a..913cbd454 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -34,7 +34,7 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg grpcDialOption: grpcDialOption, } t.ListenersCond = sync.NewCond(&t.ListenersLock) - t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() { + t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { t.ListenersCond.Broadcast() }) return t @@ -118,6 +118,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string } for { + glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs)) err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/weed/filer/mongodb/mongodb_store_kv.go b/weed/filer/mongodb/mongodb_store_kv.go index 4aa9c3a33..59b8f1d93 100644 --- a/weed/filer/mongodb/mongodb_store_kv.go +++ b/weed/filer/mongodb/mongodb_store_kv.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) func (store *MongodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { @@ -15,11 +16,11 @@ func (store *MongodbStore) KvPut(ctx context.Context, key []byte, value []byte) c := store.connect.Database(store.database).Collection(store.collectionName) - _, err = c.InsertOne(ctx, Model{ - Directory: dir, - Name: name, - Meta: value, - }) + opts := options.Update().SetUpsert(true) + filter := bson.D{{"directory", dir}, {"name", name}} + update := bson.D{{"$set", bson.D{{"meta", value}}}} + + _, err = c.UpdateOne(ctx, filter, update, opts) if err != nil { return fmt.Errorf("kv put: %v", err) diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go new file mode 100644 index 000000000..77ca81f15 --- /dev/null +++ b/weed/filer/read_remote.go @@ -0,0 +1,45 @@ +package filer + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (entry *Entry) IsInRemoteOnly() bool { + return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.RemoteSize > 0 +} + +func (f *Filer) ReadRemote(entry *Entry, offset int64, size int64) (data []byte, err error) { + client, _, found := f.RemoteStorage.GetRemoteStorageClient(entry.Remote.StorageName) + if !found { + return nil, fmt.Errorf("remote storage %v not found", entry.Remote.StorageName) + } + + mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath) + + sourceLoc := MapFullPathToRemoteStorageLocation(mountDir, remoteLoation, entry.FullPath) + + return client.ReadFile(sourceLoc, offset, size) +} + +func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMountedLocation *filer_pb.RemoteStorageLocation, fp util.FullPath) *filer_pb.RemoteStorageLocation { + remoteLocation := &filer_pb.RemoteStorageLocation{ + Name: remoteMountedLocation.Name, + Bucket: remoteMountedLocation.Bucket, + Path: remoteMountedLocation.Path, + } + remoteLocation.Path += string(fp)[len(localMountedDir):] + return remoteLocation +} + +func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *filer_pb.RemoteConf, remoteLocation *filer_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error { + return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{ + Directory: string(parent), + Name: entry.Name, + }) + return err + }) +} diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go index c4c90fb63..14e8cab1e 100644 --- a/weed/filer/read_write.go +++ b/weed/filer/read_write.go @@ -2,13 +2,9 @@ package filer import ( "bytes" - "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" - "io/ioutil" "math" - "net/http" "time" ) @@ -31,50 +27,17 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed } -func ReadContent(filerAddress string, dir, name string) ([]byte, error) { - - target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name) - - data, _, err := util.Get(target) - - return data, err -} - -func SaveAs(host string, port int, dir, name string, contentType string, byteBuffer *bytes.Buffer) error { - var target string - if port == 0 { - target = fmt.Sprintf("http://%s%s/%s", host, dir, name) - } else { - target = fmt.Sprintf("http://%s:%d%s/%s", host, port, dir, name) - } - - // set the HTTP method, url, and request body - req, err := http.NewRequest(http.MethodPut, target, byteBuffer) - if err != nil { - return err - } - - // set the request header Content-Type for json - if contentType != "" { - req.Header.Set("Content-Type", contentType) - } - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err +func ReadInsideFiler(filerClient filer_pb.SeaweedFilerClient, dir, name string) (content []byte, err error) { + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, } - defer util.CloseResponse(resp) - - b, err := ioutil.ReadAll(resp.Body) + respLookupEntry, err := filer_pb.LookupEntry(filerClient, request) if err != nil { - return err - } - - if resp.StatusCode >= 400 { - return fmt.Errorf("%s: %s %v", target, resp.Status, string(b)) + return } - - return nil - + content = respLookupEntry.Entry.Content + return } func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, content []byte) error { diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go index 379a18c62..729da7d9b 100644 --- a/weed/filer/rocksdb/rocksdb_store.go +++ b/weed/filer/rocksdb/rocksdb_store.go @@ -24,18 +24,21 @@ func init() { type options struct { opt *gorocksdb.Options + bto *gorocksdb.BlockBasedTableOptions ro *gorocksdb.ReadOptions wo *gorocksdb.WriteOptions } func (opt *options) init() { opt.opt = gorocksdb.NewDefaultOptions() + opt.bto = gorocksdb.NewDefaultBlockBasedTableOptions() opt.ro = gorocksdb.NewDefaultReadOptions() opt.wo = gorocksdb.NewDefaultWriteOptions() } func (opt *options) close() { opt.opt.Destroy() + opt.bto.Destroy() opt.ro.Destroy() opt.wo.Destroy() } @@ -69,6 +72,11 @@ func (store *RocksDBStore) initialize(dir string) (err error) { store.opt.SetCompactionFilter(NewTTLFilter()) // store.opt.SetMaxBackgroundCompactions(2) + // https://github.com/tecbot/gorocksdb/issues/132 + store.bto.SetFilterPolicy(gorocksdb.NewBloomFilterFull(8)) + store.opt.SetBlockBasedTableFactory(store.bto) + // store.opt.EnableStatistics() + store.db, err = gorocksdb.OpenDb(store.opt, dir) return diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go index 92387fb09..55c976915 100644 --- a/weed/filer/s3iam_conf.go +++ b/weed/filer/s3iam_conf.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "io" ) @@ -14,7 +15,7 @@ func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfigura return nil } -func S3ConfigurationToText(writer io.Writer, config *iam_pb.S3ApiConfiguration) error { +func ProtoToText(writer io.Writer, config proto.Message) error { m := jsonpb.Marshaler{ EmitDefaults: false, diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go index 65cc49840..da7d9c9f1 100644 --- a/weed/filer/s3iam_conf_test.go +++ b/weed/filer/s3iam_conf_test.go @@ -44,7 +44,7 @@ func TestS3Conf(t *testing.T) { }, } var buf bytes.Buffer - err := S3ConfigurationToText(&buf, s3Conf) + err := ProtoToText(&buf, s3Conf) assert.Equal(t, err, nil) s3ConfSaved := &iam_pb.S3ApiConfiguration{} err = ParseS3ConfigurationFromBytes(buf.Bytes(), s3ConfSaved) diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go index 9b7009df9..803c71afa 100644 --- a/weed/filer/sqlite/sqlite_store_unsupported.go +++ b/weed/filer/sqlite/sqlite_store_unsupported.go @@ -1,4 +1,4 @@ -// +build !linux,!darwin,!windows +// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64 // limited GOOS due to modernc.org/libc/unistd diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 70a278ca5..503e6b23f 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "sort" "strings" "time" @@ -88,56 +89,77 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferPos int - chunkIndex int - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + logicOffset int64 + buffer []byte + bufferOffset int64 + bufferPos int + nextChunkViewIndex int + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) +var _ = io.ReaderAt(&ChunkStreamReader{}) -func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - - lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { - return masterClient.LookupFileId(fileId) - } +func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + sort.Slice(chunkViews, func(i, j int) bool { + return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + }) + + var totalSize int64 + for _, chunk := range chunkViews { + totalSize += int64(chunk.Size) + } return &ChunkStreamReader{ chunkViews: chunkViews, lookupFileId: lookupFileIdFn, + totalSize: totalSize, } } +func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { + return masterClient.LookupFileId(fileId) + } + + return doNewChunkStreamReader(lookupFileIdFn, chunks) +} + func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + return doNewChunkStreamReader(lookupFileIdFn, chunks) +} - return &ChunkStreamReader{ - chunkViews: chunkViews, - lookupFileId: lookupFileIdFn, +func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { + if err = c.prepareBufferFor(c.logicOffset); err != nil { + return } + return c.Read(p) } func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { - if c.chunkIndex >= len(c.chunkViews) { + if c.nextChunkViewIndex >= len(c.chunkViews) { return n, io.EOF } - chunkView := c.chunkViews[c.chunkIndex] - c.fetchChunkToBuffer(chunkView) - c.chunkIndex++ + chunkView := c.chunkViews[c.nextChunkViewIndex] + if err = c.fetchChunkToBuffer(chunkView); err != nil { + return + } + c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t n += t + c.logicOffset += int64(t) } return } @@ -148,36 +170,53 @@ func (c *ChunkStreamReader) isBufferEmpty() bool { func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { - var totalSize int64 - for _, chunk := range c.chunkViews { - totalSize += int64(chunk.Size) - } - var err error switch whence { case io.SeekStart: case io.SeekCurrent: - offset += c.bufferOffset + int64(c.bufferPos) + offset += c.logicOffset case io.SeekEnd: - offset = totalSize + offset + offset = c.totalSize + offset } - if offset > totalSize { + if offset > c.totalSize { err = io.ErrUnexpectedEOF + } else { + c.logicOffset = offset } - for i, chunk := range c.chunkViews { - if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) - c.chunkIndex = i + 1 - break - } + return offset, err + +} + +func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { + // stay in the same chunk + if !c.isBufferEmpty() { + if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { + c.bufferPos = int(offset - c.bufferOffset) + return nil } } - c.bufferPos = int(offset - c.bufferOffset) - return offset, err + // need to seek to a different chunk + currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool { + return c.chunkViews[i].LogicOffset <= offset + }) + if currentChunkIndex == len(c.chunkViews) { + return io.EOF + } + // positioning within the new chunk + chunk := c.chunkViews[currentChunkIndex] + if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { + if err = c.fetchChunkToBuffer(chunk); err != nil { + return + } + c.nextChunkViewIndex = currentChunkIndex + 1 + } + c.bufferPos = int(offset - c.bufferOffset) + } + return } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { |
