diff options
Diffstat (limited to 'weed/server/filer_grpc_server.go')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 71 |
1 files changed, 16 insertions, 55 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index e3f4bb161..7b04e4fab 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -156,7 +156,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr resp = &filer_pb.CreateEntryResponse{} - chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry) if err2 != nil { return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } @@ -190,7 +190,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry) if err2 != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } @@ -240,7 +240,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } -func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones if existingEntry != nil { @@ -257,14 +257,13 @@ func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer garbage = append(garbage, coveredChunks...) if newEntry.Attributes != nil { - so := &filer.StorageOption{ - Replication: newEntry.Attributes.Replication, - Collection: newEntry.Attributes.Collection, - DataCenter: "", - Rack: "", - TtlSeconds: newEntry.Attributes.TtlSec, - Fsync: false, - } + so := fs.detectStorageOption(fullpath, + newEntry.Attributes.Collection, + newEntry.Attributes.Replication, + newEntry.Attributes.TtlSec, + "", + "", + ) chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks) if err != nil { // not good, but should be ok @@ -283,7 +282,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo fullpath := util.NewFullPath(req.Directory, req.EntryName) var offset int64 = 0 - entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) + entry, err := fs.filer.FindEntry(ctx, fullpath) if err == filer_pb.ErrNotFound { entry = &filer.Entry{ FullPath: fullpath, @@ -305,14 +304,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - so := &filer.StorageOption{ - Replication: entry.Replication, - Collection: entry.Collection, - DataCenter: "", - Rack: "", - TtlSeconds: entry.TtlSec, - Fsync: false, - } + so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "") entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks) if err != nil { // not good, but should be ok @@ -338,41 +330,10 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - ttlStr := "" - if req.TtlSec > 0 { - ttlStr = strconv.Itoa(int(req.TtlSec)) - } - collection, replication, _ := fs.detectCollection(req.Path, req.Collection, req.Replication) - - var altRequest *operation.VolumeAssignRequest + so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack) - dataCenter := req.DataCenter - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := req.Rack - if rack == "" { - rack = fs.option.Rack - } + assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - assignRequest := &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: dataCenter, - Rack: rack, - } - if dataCenter != "" || rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: "", - Rack: "", - } - } assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) @@ -389,8 +350,8 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Url: assignResult.Url, PublicUrl: assignResult.PublicUrl, Auth: string(assignResult.Auth), - Collection: collection, - Replication: replication, + Collection: so.Collection, + Replication: so.Replication, }, nil } |
