diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 73 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 28 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 2 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
5 files changed, 96 insertions, 14 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 17e32731c..48e9253f0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -137,13 +138,28 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } +func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) { + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + return "", err + } + locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId)) + if !found || len(locations) == 0 { + return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + } + return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil +} + func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { glog.V(4).Infof("CreateEntry %v", req) resp = &filer_pb.CreateEntryResponse{} - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + if err2 != nil { + return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) + } if req.Entry.Attributes == nil { glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) @@ -158,7 +174,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr }, req.OExcl, req.IsFromOtherCluster) if createErr == nil { - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr) resp.Error = createErr.Error() @@ -177,10 +193,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - // remove old chunks if not included in the new ones - unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) - - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + if err2 != nil { + return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) + } newEntry := &filer2.Entry{ FullPath: util.JoinPath(req.Directory, req.Entry.Name), @@ -214,8 +230,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(unusedChunks) - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } @@ -225,6 +240,37 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } +func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { + chunks = newEntry.Chunks + + // remove old chunks if not included in the new ones + if existingEntry != nil { + garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) + if err != nil { + return chunks, nil, fmt.Errorf("MinusChunks: %v", err) + } + } + + // files with manifest chunks are usually large and append only, skip calculating covered chunks + var coveredChunks []*filer_pb.FileChunk + if !filer2.HasChunkManifest(newEntry.Chunks) { + chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks) + garbage = append(garbage, coveredChunks...) + } + + chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + newEntry.Attributes.Replication, + newEntry.Attributes.Collection, + "", + needle.SecondsToTTL(newEntry.Attributes.TtlSec), + false), chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + return +} + func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) { glog.V(4).Infof("AppendToEntry %v", req) @@ -254,6 +300,17 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo entry.Chunks = append(entry.Chunks, req.Chunks...) + entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + entry.Replication, + entry.Collection, + "", + needle.SecondsToTTL(entry.TtlSec), + false), entry.Chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + err = fs.filer.CreateEntry(context.Background(), entry, false, false) return &filer_pb.AppendToEntryResponse{}, err diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index a642c502a..da66178ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -40,7 +40,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -67,7 +67,6 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) - writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return } @@ -114,7 +113,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 29546542c..be0438efb 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if assignErr != nil { return nil, assignErr } @@ -132,6 +132,12 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } } + fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } + path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -184,3 +190,23 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) return uploadResult, err } + +func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) + if assignErr != nil { + return nil, "", "", assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth) + if uploadErr != nil { + return nil, "", "", uploadErr + } + + return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil + } +} + diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 17f35838d..8413496b8 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -19,7 +19,7 @@ import ( func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index e8bedd352..8655daf70 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -474,7 +474,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks) f.reader = nil } if f.reader == nil { |
