aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/cassandra/cassandra_store.go2
-rw-r--r--weed/filer/filechunk_manifest.go11
-rw-r--r--weed/filer/filechunks.go12
-rw-r--r--weed/filer/filechunks_test.go4
-rw-r--r--weed/filesys/filehandle.go2
-rw-r--r--weed/replication/sink/filersink/filer_sink.go5
-rw-r--r--weed/server/filer_server_handlers.go8
-rw-r--r--weed/server/filer_server_handlers_read.go2
-rw-r--r--weed/server/webdav_server.go2
-rw-r--r--weed/shell/command_volume_fsck.go4
10 files changed, 29 insertions, 23 deletions
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index d917da518..7affab9ec 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -124,7 +124,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
var data []byte
if err := store.session.Query(
"SELECT meta FROM filemeta WHERE directory=? AND name=?",
- dir, name).Consistency(gocql.LocalOne).Scan(&data); err != nil {
+ dir, name).Scan(&data); err != nil {
if err != gocql.ErrNotFound {
return nil, filer_pb.ErrNotFound
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index c709dc819..0c6b0cfe3 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -16,7 +16,7 @@ import (
)
const (
- ManifestBatch = 1000
+ ManifestBatch = 10000
)
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
@@ -39,9 +39,14 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
+
+ if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
+ continue
+ }
+
if !chunk.IsChunkManifest {
dataChunks = append(dataChunks, chunk)
continue
@@ -54,7 +59,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
manifestChunks = append(manifestChunks, chunk)
// recursive
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
+ dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
return chunks, nil, subErr
}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 346eb3cfb..0dc03f6e2 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -53,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
fileIds := make(map[string]bool)
for _, interval := range visibles {
@@ -72,11 +72,11 @@ func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks
func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
- aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
+ aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64)
if aErr != nil {
return nil, aErr
}
- bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs)
+ bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64)
if bErr != nil {
return nil, bErr
}
@@ -117,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
return ViewFromVisibleIntervals(visibles, offset, size)
@@ -221,9 +221,9 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
- chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
+ chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
sort.Slice(chunks, func(i, j int) bool {
if chunks[i].Mtime == chunks[j].Mtime {
diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go
index 699e7e298..b0ea20848 100644
--- a/weed/filer/filechunks_test.go
+++ b/weed/filer/filechunks_test.go
@@ -90,7 +90,7 @@ func TestRandomFileChunksCompact(t *testing.T) {
}
}
- visibles, _ := NonOverlappingVisibleIntervals(nil, chunks)
+ visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64)
for _, v := range visibles {
for x := v.start; x < v.stop; x++ {
@@ -227,7 +227,7 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
- intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks)
+ intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64)
for x, interval := range intervals {
log.Printf("test case %d, interval %d, start=%d, stop=%d, fileId=%s",
i, x, interval.start, interval.stop, interval.fileId)
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index f95051f65..9acede330 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -130,7 +130,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
var chunkResolveErr error
if fh.entryViewCache == nil {
- fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
+ fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 608103469..3898f2c58 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
+ "math"
"google.golang.org/grpc"
@@ -228,11 +229,11 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
}
func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
- aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks)
+ aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks, 0, math.MaxInt64)
if aErr != nil {
return nil, nil, aErr
}
- bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks)
+ bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks, 0, math.MaxInt64)
if bErr != nil {
return nil, nil, bErr
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 56a47c860..0389e1e18 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -35,11 +35,11 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
stats.FilerRequestCounter.WithLabelValues("get").Inc()
- fs.GetOrHeadHandler(w, r, true)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
stats.FilerRequestCounter.WithLabelValues("head").Inc()
- fs.GetOrHeadHandler(w, r, false)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "DELETE":
stats.FilerRequestCounter.WithLabelValues("delete").Inc()
@@ -95,11 +95,11 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
switch r.Method {
case "GET":
stats.FilerRequestCounter.WithLabelValues("get").Inc()
- fs.GetOrHeadHandler(w, r, true)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
stats.FilerRequestCounter.WithLabelValues("head").Inc()
- fs.GetOrHeadHandler(w, r, false)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "OPTIONS":
stats.FilerRequestCounter.WithLabelValues("options").Inc()
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 153f68f8f..957e08855 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -21,7 +21,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
+func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
isForDirectory := strings.HasSuffix(path, "/")
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index c6550a36f..68c1f3233 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -532,7 +532,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64)
f.reader = nil
}
if f.reader == nil {
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 2ced0f571..bd3be4d89 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -164,7 +164,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
if verbose && entry.Entry.IsDirectory {
fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
}
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
return nil
}
@@ -311,7 +311,7 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
files[i.vid].Write(buffer)
}
}, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
return nil
}