aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server.go')
-rw-r--r--weed/server/filer_grpc_server.go131
1 files changed, 92 insertions, 39 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index feab11c79..b904c1393 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -19,7 +19,11 @@ import (
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
+ if err == filer_pb.ErrNotFound {
+ return &filer_pb.LookupDirectoryEntryResponse{}, nil
+ }
if err != nil {
+ glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
return nil, err
}
@@ -29,32 +33,33 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
IsDirectory: entry.IsDirectory(),
Attributes: filer2.EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
},
}, nil
}
-func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
- paginationLimit := 1024
+ paginationLimit := filer2.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
- resp := &filer_pb.ListEntriesResponse{}
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+
if err != nil {
- return nil, err
+ return err
}
if len(entries) == 0 {
- return resp, nil
+ return nil
}
includeLastFile = false
@@ -69,15 +74,21 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
}
}
- resp.Entries = append(resp.Entries, &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
- })
+ if err := stream.Send(&filer_pb.ListEntriesResponse{
+ Entry: &filer_pb.Entry{
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer2.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ },
+ }); err != nil {
+ return err
+ }
+
limit--
if limit == 0 {
- return resp, nil
+ return nil
}
}
@@ -87,7 +98,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
}
- return resp, nil
+ return nil
}
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
@@ -123,24 +134,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
+ resp = &filer_pb.CreateEntryResponse{}
+
fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)))
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
if req.Entry.Attributes == nil {
- return nil, fmt.Errorf("can not create entry with empty attributes")
+ glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
+ resp.Error = fmt.Sprintf("can not create entry with empty attributes")
+ return
}
- err = fs.filer.CreateEntry(ctx, &filer2.Entry{
+ createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{
FullPath: fullpath,
Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
Chunks: chunks,
- })
+ }, req.OExcl)
- if err == nil {
- fs.filer.DeleteChunks(fullpath, garbages)
+ if createErr == nil {
+ fs.filer.DeleteChunks(garbages)
+ } else {
+ glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
+ resp.Error = createErr.Error()
}
- return &filer_pb.CreateEntryResponse{}, err
+ return
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
@@ -159,12 +177,14 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
newEntry := &filer2.Entry{
FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))),
Attr: entry.Attr,
+ Extended: req.Entry.Extended,
Chunks: chunks,
}
- glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v",
+ glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
- req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks)
+ req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks,
+ entry.Extended, req.Entry.Extended)
if req.Entry.Attributes != nil {
if req.Entry.Attributes.Mtime != 0 {
@@ -186,8 +206,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
- fs.filer.DeleteChunks(entry.FullPath, garbages)
+ fs.filer.DeleteChunks(unusedChunks)
+ fs.filer.DeleteChunks(garbages)
+ } else {
+ glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
fs.filer.NotifyUpdateEvent(entry, newEntry, true)
@@ -197,7 +219,30 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
- return &filer_pb.DeleteEntryResponse{}, err
+ resp = &filer_pb.DeleteEntryResponse{}
+ if err != nil {
+ resp.Error = err.Error()
+ }
+ return resp, nil
+}
+
+func (fs *FilerServer) StreamDeleteEntries(stream filer_pb.SeaweedFiler_StreamDeleteEntriesServer) error {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ return fmt.Errorf("receive delete entry request: %v", err)
+ }
+ fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))
+ err = fs.filer.DeleteEntryMetaAndData(context.Background(), fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
+ resp := &filer_pb.DeleteEntryResponse{}
+ if err != nil {
+ resp.Error = err.Error()
+ }
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
@@ -206,6 +251,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec))
}
+ collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest
@@ -216,41 +262,45 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest := &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: dataCenter,
}
if dataCenter != "" {
altRequest = &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: "",
}
}
assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
- return nil, fmt.Errorf("assign volume: %v", err)
+ glog.V(3).Infof("AssignVolume: %v", err)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
if assignResult.Error != "" {
- return nil, fmt.Errorf("assign volume result: %v", assignResult.Error)
+ glog.V(3).Infof("AssignVolume error: %v", assignResult.Error)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
- Auth: string(assignResult.Auth),
- }, err
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ Auth: string(assignResult.Auth),
+ Collection: collection,
+ Replication: replication,
+ }, nil
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
- err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
})
return err
@@ -286,5 +336,8 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ DirQueues: fs.filer.DirQueuesPath,
+ Cipher: fs.filer.Cipher,
}, nil
}