diff options
Diffstat (limited to 'weed/server/filer_grpc_server.go')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 131 |
1 files changed, 92 insertions, 39 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index feab11c79..b904c1393 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -19,7 +19,11 @@ import ( func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) + if err == filer_pb.ErrNotFound { + return &filer_pb.LookupDirectoryEntryResponse{}, nil + } if err != nil { + glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err) return nil, err } @@ -29,32 +33,33 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L IsDirectory: entry.IsDirectory(), Attributes: filer2.EntryAttributeToPb(entry), Chunks: entry.Chunks, + Extended: entry.Extended, }, }, nil } -func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) { +func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { limit := int(req.Limit) if limit == 0 { limit = fs.option.DirListingLimit } - paginationLimit := 1024 + paginationLimit := filer2.PaginationSize if limit < paginationLimit { paginationLimit = limit } - resp := &filer_pb.ListEntriesResponse{} lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + if err != nil { - return nil, err + return err } if len(entries) == 0 { - return resp, nil + return nil } includeLastFile = false @@ -69,15 +74,21 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie } } - resp.Entries = append(resp.Entries, &filer_pb.Entry{ - Name: entry.Name(), - IsDirectory: entry.IsDirectory(), - Chunks: entry.Chunks, - Attributes: filer2.EntryAttributeToPb(entry), - }) + if err := stream.Send(&filer_pb.ListEntriesResponse{ + Entry: &filer_pb.Entry{ + Name: entry.Name(), + IsDirectory: entry.IsDirectory(), + Chunks: entry.Chunks, + Attributes: filer2.EntryAttributeToPb(entry), + Extended: entry.Extended, + }, + }); err != nil { + return err + } + limit-- if limit == 0 { - return resp, nil + return nil } } @@ -87,7 +98,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie } - return resp, nil + return nil } func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { @@ -123,24 +134,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { + resp = &filer_pb.CreateEntryResponse{} + fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) if req.Entry.Attributes == nil { - return nil, fmt.Errorf("can not create entry with empty attributes") + glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) + resp.Error = fmt.Sprintf("can not create entry with empty attributes") + return } - err = fs.filer.CreateEntry(ctx, &filer2.Entry{ + createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{ FullPath: fullpath, Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, - }) + }, req.OExcl) - if err == nil { - fs.filer.DeleteChunks(fullpath, garbages) + if createErr == nil { + fs.filer.DeleteChunks(garbages) + } else { + glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr) + resp.Error = createErr.Error() } - return &filer_pb.CreateEntryResponse{}, err + return } func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { @@ -159,12 +177,14 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr newEntry := &filer2.Entry{ FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))), Attr: entry.Attr, + Extended: req.Entry.Extended, Chunks: chunks, } - glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v", + glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", fullpath, entry.Attr, len(entry.Chunks), entry.Chunks, - req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks) + req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks, + entry.Extended, req.Entry.Extended) if req.Entry.Attributes != nil { if req.Entry.Attributes.Mtime != 0 { @@ -186,8 +206,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(entry.FullPath, unusedChunks) - fs.filer.DeleteChunks(entry.FullPath, garbages) + fs.filer.DeleteChunks(unusedChunks) + fs.filer.DeleteChunks(garbages) + } else { + glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } fs.filer.NotifyUpdateEvent(entry, newEntry, true) @@ -197,7 +219,30 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) - return &filer_pb.DeleteEntryResponse{}, err + resp = &filer_pb.DeleteEntryResponse{} + if err != nil { + resp.Error = err.Error() + } + return resp, nil +} + +func (fs *FilerServer) StreamDeleteEntries(stream filer_pb.SeaweedFiler_StreamDeleteEntriesServer) error { + for { + req, err := stream.Recv() + if err != nil { + return fmt.Errorf("receive delete entry request: %v", err) + } + fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))) + err = fs.filer.DeleteEntryMetaAndData(context.Background(), fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) + resp := &filer_pb.DeleteEntryResponse{} + if err != nil { + resp.Error = err.Error() + } + if err := stream.Send(resp); err != nil { + return err + } + } + return nil } func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { @@ -206,6 +251,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol if req.TtlSec > 0 { ttlStr = strconv.Itoa(int(req.TtlSec)) } + collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication) var altRequest *operation.VolumeAssignRequest @@ -216,41 +262,45 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignRequest := &operation.VolumeAssignRequest{ Count: uint64(req.Count), - Replication: req.Replication, - Collection: req.Collection, + Replication: replication, + Collection: collection, Ttl: ttlStr, DataCenter: dataCenter, } if dataCenter != "" { altRequest = &operation.VolumeAssignRequest{ Count: uint64(req.Count), - Replication: req.Replication, - Collection: req.Collection, + Replication: replication, + Collection: collection, Ttl: ttlStr, DataCenter: "", } } assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { - return nil, fmt.Errorf("assign volume: %v", err) + glog.V(3).Infof("AssignVolume: %v", err) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil } if assignResult.Error != "" { - return nil, fmt.Errorf("assign volume result: %v", assignResult.Error) + glog.V(3).Infof("AssignVolume error: %v", assignResult.Error) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil } return &filer_pb.AssignVolumeResponse{ - FileId: assignResult.Fid, - Count: int32(assignResult.Count), - Url: assignResult.Url, - PublicUrl: assignResult.PublicUrl, - Auth: string(assignResult.Auth), - }, err + FileId: assignResult.Fid, + Count: int32(assignResult.Count), + Url: assignResult.Url, + PublicUrl: assignResult.PublicUrl, + Auth: string(assignResult.Auth), + Collection: collection, + Replication: replication, + }, nil } func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { - err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ + err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), }) return err @@ -286,5 +336,8 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + DirQueues: fs.filer.DirQueuesPath, + Cipher: fs.filer.Cipher, }, nil } |
