diff options
Diffstat (limited to 'weed/server/filer_grpc_server.go')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 204 |
1 files changed, 156 insertions, 48 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 72d5b2e1d..b904c1393 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -5,22 +5,26 @@ import ( "fmt" "os" "path/filepath" + "strconv" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "strconv" - "strings" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { - entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name))) + entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) + if err == filer_pb.ErrNotFound { + return &filer_pb.LookupDirectoryEntryResponse{}, nil + } if err != nil { - return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err) + glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err) + return nil, err } return &filer_pb.LookupDirectoryEntryResponse{ @@ -29,27 +33,33 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L IsDirectory: entry.IsDirectory(), Attributes: filer2.EntryAttributeToPb(entry), Chunks: entry.Chunks, + Extended: entry.Extended, }, }, nil } -func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) { +func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { limit := int(req.Limit) if limit == 0 { limit = fs.option.DirListingLimit } - resp := &filer_pb.ListEntriesResponse{} + paginationLimit := filer2.PaginationSize + if limit < paginationLimit { + paginationLimit = limit + } + lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, limit) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + if err != nil { - return nil, err + return err } if len(entries) == 0 { - return resp, nil + return nil } includeLastFile = false @@ -64,18 +74,31 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie } } - resp.Entries = append(resp.Entries, &filer_pb.Entry{ - Name: entry.Name(), - IsDirectory: entry.IsDirectory(), - Chunks: entry.Chunks, - Attributes: filer2.EntryAttributeToPb(entry), - }) + if err := stream.Send(&filer_pb.ListEntriesResponse{ + Entry: &filer_pb.Entry{ + Name: entry.Name(), + IsDirectory: entry.IsDirectory(), + Chunks: entry.Chunks, + Attributes: filer2.EntryAttributeToPb(entry), + Extended: entry.Extended, + }, + }); err != nil { + return err + } + limit-- + if limit == 0 { + return nil + } + } + + if len(entries) < paginationLimit { + break } } - return resp, nil + return nil } func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { @@ -91,7 +114,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return nil, err } var locs []*filer_pb.Location - for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) { + locations, found := fs.filer.MasterClient.GetLocations(uint32(vid)) + if !found { + continue + } + for _, loc := range locations { locs = append(locs, &filer_pb.Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, @@ -107,45 +134,57 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { - fullpath := filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)) + resp = &filer_pb.CreateEntryResponse{} + + fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) - fs.filer.DeleteChunks(garbages) + if req.Entry.Attributes == nil { + glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) + resp.Error = fmt.Sprintf("can not create entry with empty attributes") + return + } - err = fs.filer.CreateEntry(&filer2.Entry{ + createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{ FullPath: fullpath, Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, - }) + }, req.OExcl) - if err == nil { + if createErr == nil { + fs.filer.DeleteChunks(garbages) + } else { + glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr) + resp.Error = createErr.Error() } - return &filer_pb.CreateEntryResponse{}, err + return } func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { - fullpath := filepath.Join(req.Directory, req.Entry.Name) - entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath)) + fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)) + entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath)) if err != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } // remove old chunks if not included in the new ones - unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks) + unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) newEntry := &filer2.Entry{ - FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)), + FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))), Attr: entry.Attr, + Extended: req.Entry.Extended, Chunks: chunks, } - glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v", + glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", fullpath, entry.Attr, len(entry.Chunks), entry.Chunks, - req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks) + req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks, + entry.Extended, req.Entry.Extended) if req.Entry.Attributes != nil { if req.Entry.Attributes.Mtime != 0 { @@ -157,6 +196,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr newEntry.Attr.Uid = req.Entry.Attributes.Uid newEntry.Attr.Gid = req.Entry.Attributes.Gid newEntry.Attr.Mime = req.Entry.Attributes.Mime + newEntry.Attr.UserName = req.Entry.Attributes.UserName + newEntry.Attr.GroupNames = req.Entry.Attributes.GroupName } @@ -164,9 +205,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } - if err = fs.filer.UpdateEntry(newEntry); err == nil { + if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { fs.filer.DeleteChunks(unusedChunks) fs.filer.DeleteChunks(garbages) + } else { + glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } fs.filer.NotifyUpdateEvent(entry, newEntry, true) @@ -175,8 +218,31 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { - err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsRecursive, req.IsDeleteData) - return &filer_pb.DeleteEntryResponse{}, err + err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) + resp = &filer_pb.DeleteEntryResponse{} + if err != nil { + resp.Error = err.Error() + } + return resp, nil +} + +func (fs *FilerServer) StreamDeleteEntries(stream filer_pb.SeaweedFiler_StreamDeleteEntriesServer) error { + for { + req, err := stream.Recv() + if err != nil { + return fmt.Errorf("receive delete entry request: %v", err) + } + fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))) + err = fs.filer.DeleteEntryMetaAndData(context.Background(), fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) + resp := &filer_pb.DeleteEntryResponse{} + if err != nil { + resp.Error = err.Error() + } + if err := stream.Send(resp); err != nil { + return err + } + } + return nil } func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { @@ -185,6 +251,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol if req.TtlSec > 0 { ttlStr = strconv.Itoa(int(req.TtlSec)) } + collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication) var altRequest *operation.VolumeAssignRequest @@ -195,41 +262,82 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignRequest := &operation.VolumeAssignRequest{ Count: uint64(req.Count), - Replication: req.Replication, - Collection: req.Collection, + Replication: replication, + Collection: collection, Ttl: ttlStr, DataCenter: dataCenter, } if dataCenter != "" { altRequest = &operation.VolumeAssignRequest{ Count: uint64(req.Count), - Replication: req.Replication, - Collection: req.Collection, + Replication: replication, + Collection: collection, Ttl: ttlStr, DataCenter: "", } } - assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest) + assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { - return nil, fmt.Errorf("assign volume: %v", err) + glog.V(3).Infof("AssignVolume: %v", err) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil } if assignResult.Error != "" { - return nil, fmt.Errorf("assign volume result: %v", assignResult.Error) + glog.V(3).Infof("AssignVolume error: %v", assignResult.Error) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil } return &filer_pb.AssignVolumeResponse{ - FileId: assignResult.Fid, - Count: int32(assignResult.Count), - Url: assignResult.Url, - PublicUrl: assignResult.PublicUrl, - }, err + FileId: assignResult.Fid, + Count: int32(assignResult.Count), + Url: assignResult.Url, + PublicUrl: assignResult.PublicUrl, + Auth: string(assignResult.Auth), + Collection: collection, + Replication: replication, + }, nil } func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { - for _, master := range fs.option.Masters { - _, err = util.Get(fmt.Sprintf("http://%s/col/delete?collection=%s", master, req.Collection)) - } + err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + Name: req.GetCollection(), + }) + return err + }) return &filer_pb.DeleteCollectionResponse{}, err } + +func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { + + input := &master_pb.StatisticsRequest{ + Replication: req.Replication, + Collection: req.Collection, + Ttl: req.Ttl, + } + + output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input) + if err != nil { + return nil, err + } + + return &filer_pb.StatisticsResponse{ + TotalSize: output.TotalSize, + UsedSize: output.UsedSize, + FileCount: output.FileCount, + }, nil +} + +func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + + return &filer_pb.GetFilerConfigurationResponse{ + Masters: fs.option.Masters, + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + DirQueues: fs.filer.DirQueuesPath, + Cipher: fs.filer.Cipher, + }, nil +} |
