aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server.go')
-rw-r--r--weed/server/filer_grpc_server.go73
1 files changed, 65 insertions, 8 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 17e32731c..48e9253f0 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -137,13 +138,28 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
+func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) {
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ return "", err
+ }
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
+ if !found || len(locations) == 0 {
+ return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ }
+ return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil
+}
+
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
glog.V(4).Infof("CreateEntry %v", req)
resp = &filer_pb.CreateEntryResponse{}
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry)
+ if err2 != nil {
+ return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
+ }
if req.Entry.Attributes == nil {
glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
@@ -158,7 +174,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
}, req.OExcl, req.IsFromOtherCluster)
if createErr == nil {
- fs.filer.DeleteChunks(garbages)
+ fs.filer.DeleteChunks(garbage)
} else {
glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
resp.Error = createErr.Error()
@@ -177,10 +193,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
- // remove old chunks if not included in the new ones
- unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
-
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry)
+ if err2 != nil {
+ return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
+ }
newEntry := &filer2.Entry{
FullPath: util.JoinPath(req.Directory, req.Entry.Name),
@@ -214,8 +230,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(unusedChunks)
- fs.filer.DeleteChunks(garbages)
+ fs.filer.DeleteChunks(garbage)
} else {
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
@@ -225,6 +240,37 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err
}
+func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
+ chunks = newEntry.Chunks
+
+ // remove old chunks if not included in the new ones
+ if existingEntry != nil {
+ garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
+ if err != nil {
+ return chunks, nil, fmt.Errorf("MinusChunks: %v", err)
+ }
+ }
+
+ // files with manifest chunks are usually large and append only, skip calculating covered chunks
+ var coveredChunks []*filer_pb.FileChunk
+ if !filer2.HasChunkManifest(newEntry.Chunks) {
+ chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks)
+ garbage = append(garbage, coveredChunks...)
+ }
+
+ chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
+ newEntry.Attributes.Replication,
+ newEntry.Attributes.Collection,
+ "",
+ needle.SecondsToTTL(newEntry.Attributes.TtlSec),
+ false), chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
+ return
+}
+
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
glog.V(4).Infof("AppendToEntry %v", req)
@@ -254,6 +300,17 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
entry.Chunks = append(entry.Chunks, req.Chunks...)
+ entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
+ entry.Replication,
+ entry.Collection,
+ "",
+ needle.SecondsToTTL(entry.TtlSec),
+ false), entry.Chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
+
err = fs.filer.CreateEntry(context.Background(), entry, false, false)
return &filer_pb.AppendToEntryResponse{}, err