aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-22 20:04:56 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-22 20:04:56 -0700
commit3fa1f150d9644056344b8ca822254de8fcbd203b (patch)
treedca95c0f5e17491697d476b1a0ef62fd8fbb2535
parent1babec00e70fc194fa40951162db907cf0e363cd (diff)
downloadseaweedfs-3fa1f150d9644056344b8ca822254de8fcbd203b.tar.xz
seaweedfs-3fa1f150d9644056344b8ca822254de8fcbd203b.zip
refactoring
-rw-r--r--weed/filer2/filechunks.go16
-rw-r--r--weed/filer2/filer_deletion.go2
-rw-r--r--weed/filesys/wfs_deletion.go2
-rw-r--r--weed/replication/sink/filersink/fetch_write.go8
-rw-r--r--weed/replication/sink/filersink/filer_sink.go22
-rw-r--r--weed/s3api/filer_multipart.go2
-rw-r--r--weed/server/filer_grpc_server.go2
-rw-r--r--weed/server/filer_server_handlers_read.go2
8 files changed, 20 insertions, 36 deletions
diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go
index 6c3157e6c..b5876df82 100644
--- a/weed/filer2/filechunks.go
+++ b/weed/filer2/filechunks.go
@@ -40,7 +40,7 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
fileIds[interval.fileId] = true
}
for _, chunk := range chunks {
- if found := fileIds[chunk.FileId]; found {
+ if _, found := fileIds[chunk.GetFileIdString()]; found {
compacted = append(compacted, chunk)
} else {
garbage = append(garbage, chunk)
@@ -50,15 +50,15 @@ func CompactFileChunks(chunks []*filer_pb.FileChunk) (compacted, garbage []*file
return
}
-func FindUnusedFileChunks(oldChunks, newChunks []*filer_pb.FileChunk) (unused []*filer_pb.FileChunk) {
+func MinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
fileIds := make(map[string]bool)
- for _, interval := range newChunks {
- fileIds[interval.FileId] = true
+ for _, interval := range bs {
+ fileIds[interval.GetFileIdString()] = true
}
- for _, chunk := range oldChunks {
- if found := fileIds[chunk.FileId]; !found {
- unused = append(unused, chunk)
+ for _, chunk := range as {
+ if _, found := fileIds[chunk.GetFileIdString()]; !found {
+ delta = append(delta, chunk)
}
}
@@ -123,7 +123,7 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.
newV := newVisibleInterval(
chunk.Offset,
chunk.Offset+int64(chunk.Size),
- chunk.FileId,
+ chunk.GetFileIdString(),
chunk.Mtime,
true,
)
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index f8b09adf6..fea93d57f 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -54,7 +54,7 @@ func (f *Filer) loopProcessingDeletion() {
func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
- f.fileIdDeletionChan <- chunk.FileId
+ f.fileIdDeletionChan <- chunk.GetFileIdString()
}
}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index 1378b6122..6e586b7df 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -17,7 +17,7 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu
var fileIds []string
for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.FileId)
+ fileIds = append(fileIds, chunk.GetFileIdString())
}
wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d24770e3d..97e9671a3 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -39,7 +39,7 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p
fileId, err := fs.fetchAndWrite(ctx, sourceChunk)
if err != nil {
- return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
+ return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
}
return &filer_pb.FileChunk{
@@ -48,15 +48,15 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p
Size: sourceChunk.Size,
Mtime: sourceChunk.Mtime,
ETag: sourceChunk.ETag,
- SourceFileId: sourceChunk.FileId,
+ SourceFileId: sourceChunk.GetFileIdString(),
}, nil
}
func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
- filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.FileId)
+ filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString())
if err != nil {
- return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err)
+ return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
defer readCloser.Close()
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index ff0fe8b74..f99c7fdf6 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -179,7 +179,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
- existingEntry.Chunks = minusChunks(existingEntry.Chunks, deletedChunks)
+ existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
}
// replicate the chunks that are new in the source
@@ -207,23 +207,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
}
func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
- deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks)
- newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks)
- return
-}
-
-func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
- for _, a := range as {
- found := false
- for _, b := range bs {
- if a.FileId == b.FileId {
- found = true
- break
- }
- }
- if !found {
- delta = append(delta, a)
- }
- }
+ deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
+ newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
return
}
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index e6af085fd..0739b2220 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -69,7 +69,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.C
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
for _, chunk := range entry.Chunks {
p := &filer_pb.FileChunk{
- FileId: chunk.FileId,
+ FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
Mtime: chunk.Mtime,
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index bcac4dbde..f8bb9cf07 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -140,7 +140,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
// remove old chunks if not included in the new ones
- unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks)
+ unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 1d18bd75e..c7cb4f0cc 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -78,7 +78,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
- fileId := entry.Chunks[0].FileId
+ fileId := entry.Chunks[0].GetFileIdString()
urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
if err != nil {