aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/filer_pb
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/filer_pb')
-rw-r--r--weed/pb/filer_pb/filer_client.go56
-rw-r--r--weed/pb/filer_pb/filer_client_bfs.go2
-rw-r--r--weed/pb/filer_pb/filer_pb_helper.go12
3 files changed, 35 insertions, 35 deletions
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 35cea71b3..60a7c637c 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -20,12 +20,12 @@ var (
)
type FilerClient interface {
- WithFilerClient(streamingMode bool, fn func(SeaweedFilerClient) error) error
+ WithFilerClient(streamingMode bool, fn func(SeaweedFilerClient) error) error // 15 implementation
AdjustedUrl(location *Location) string
GetDataCenter() string
}
-func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
+func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
dir, name := fullFilePath.DirAndName()
@@ -37,7 +37,7 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
}
// glog.V(3).Infof("read %s request: %v", fullFilePath, request)
- resp, err := LookupEntry(client, request)
+ resp, err := LookupEntry(ctx, client, request)
if err != nil {
glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
return err
@@ -57,7 +57,7 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
type EachEntryFunction func(entry *Entry, isLast bool) error
-func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction) (err error) {
+func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction) (err error) {
var counter uint32
var startFrom string
@@ -69,13 +69,13 @@ func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefi
var paginationLimit uint32 = 10000
- if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
+ if err = doList(ctx, filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
return err
}
for counter == paginationLimit {
counter = 0
- if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
+ if err = doList(ctx, filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
return err
}
}
@@ -83,23 +83,23 @@ func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefi
return nil
}
-func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
+func List(ctx context.Context, filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
- return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
+ return doSeaweedList(ctx, client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
})
}
-func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
+func doList(ctx context.Context, filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
- return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
+ return doSeaweedList(ctx, client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
})
}
-func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
- return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
+func SeaweedList(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
+ return doSeaweedList(ctx, client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
}
-func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
+func doSeaweedList(ctx context.Context, client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunction, startFrom string, inclusive bool, limit uint32) (err error) {
// Redundancy limit to make it correctly judge whether it is the last file.
redLimit := limit
@@ -118,7 +118,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
}
glog.V(4).Infof("read directory: %v", request)
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.ListEntries(ctx, request)
if err != nil {
@@ -156,7 +156,7 @@ func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix
return nil
}
-func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
+func Exists(ctx context.Context, filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
err = filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
@@ -166,7 +166,7 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin
}
glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
- resp, err := LookupEntry(client, request)
+ resp, err := LookupEntry(ctx, client, request)
if err != nil {
if err == ErrNotFound {
exists = false
@@ -184,7 +184,7 @@ func Exists(filerClient FilerClient, parentDirectoryPath string, entryName strin
return
}
-func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) {
+func Touch(ctx context.Context, filerClient FilerClient, parentDirectoryPath string, entryName string, entry *Entry) (err error) {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
@@ -194,7 +194,7 @@ func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string
}
glog.V(4).Infof("touch entry %v/%v: %v", parentDirectoryPath, entryName, request)
- if err := UpdateEntry(client, request); err != nil {
+ if err := UpdateEntry(ctx, client, request); err != nil {
glog.V(0).Infof("touch exists entry %v: %v", request, err)
return fmt.Errorf("touch exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
}
@@ -204,13 +204,13 @@ func Touch(filerClient FilerClient, parentDirectoryPath string, entryName string
}
-func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
+func Mkdir(ctx context.Context, filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
- return DoMkdir(client, parentDirectoryPath, dirName, fn)
+ return DoMkdir(ctx, client, parentDirectoryPath, dirName, fn)
})
}
-func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
+func DoMkdir(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
entry := &Entry{
Name: dirName,
IsDirectory: true,
@@ -233,7 +233,7 @@ func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName stri
}
glog.V(1).Infof("mkdir: %v", request)
- if err := CreateEntry(client, request); err != nil {
+ if err := CreateEntry(ctx, client, request); err != nil {
glog.V(0).Infof("mkdir %v: %v", request, err)
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
}
@@ -241,7 +241,7 @@ func DoMkdir(client SeaweedFilerClient, parentDirectoryPath string, dirName stri
return nil
}
-func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error {
+func MkFile(ctx context.Context, filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
entry := &Entry{
@@ -267,7 +267,7 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string
}
glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
- if err := CreateEntry(client, request); err != nil {
+ if err := CreateEntry(ctx, client, request); err != nil {
glog.V(0).Infof("create file %v:%v", request, err)
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
}
@@ -276,13 +276,13 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string
})
}
-func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
+func Remove(ctx context.Context, filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
return filerClient.WithFilerClient(false, func(client SeaweedFilerClient) error {
- return DoRemove(client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures)
+ return DoRemove(ctx, client, parentDirectoryPath, name, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster, signatures)
})
}
-func DoRemove(client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32) error {
+func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPath string, name string, isDeleteData bool, isRecursive bool, ignoreRecursiveErr bool, isFromOtherCluster bool, signatures []int32) error {
deleteEntryRequest := &DeleteEntryRequest{
Directory: parentDirectoryPath,
Name: name,
@@ -292,7 +292,7 @@ func DoRemove(client SeaweedFilerClient, parentDirectoryPath string, name string
IsFromOtherCluster: isFromOtherCluster,
Signatures: signatures,
}
- if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
+ if resp, err := client.DeleteEntry(ctx, deleteEntryRequest); err != nil {
if strings.Contains(err.Error(), ErrNotFound.Error()) {
return nil
}
diff --git a/weed/pb/filer_pb/filer_client_bfs.go b/weed/pb/filer_pb/filer_client_bfs.go
index e43443706..55d492aca 100644
--- a/weed/pb/filer_pb/filer_client_bfs.go
+++ b/weed/pb/filer_pb/filer_client_bfs.go
@@ -52,7 +52,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue[util.FullPath], jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
- return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
+ return ReadDirAllEntries(context.Background(), filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
fn(parentPath, entry)
diff --git a/weed/pb/filer_pb/filer_pb_helper.go b/weed/pb/filer_pb/filer_pb_helper.go
index 0ec31420c..b45d999ad 100644
--- a/weed/pb/filer_pb/filer_pb_helper.go
+++ b/weed/pb/filer_pb/filer_pb_helper.go
@@ -108,8 +108,8 @@ func AfterEntryDeserialization(chunks []*FileChunk) {
}
}
-func CreateEntry(client SeaweedFilerClient, request *CreateEntryRequest) error {
- resp, err := client.CreateEntry(context.Background(), request)
+func CreateEntry(ctx context.Context, client SeaweedFilerClient, request *CreateEntryRequest) error {
+ resp, err := client.CreateEntry(ctx, request)
if err != nil {
glog.V(1).Infof("create entry %s/%s %v: %v", request.Directory, request.Entry.Name, request.OExcl, err)
return fmt.Errorf("CreateEntry: %v", err)
@@ -121,8 +121,8 @@ func CreateEntry(client SeaweedFilerClient, request *CreateEntryRequest) error {
return nil
}
-func UpdateEntry(client SeaweedFilerClient, request *UpdateEntryRequest) error {
- _, err := client.UpdateEntry(context.Background(), request)
+func UpdateEntry(ctx context.Context, client SeaweedFilerClient, request *UpdateEntryRequest) error {
+ _, err := client.UpdateEntry(ctx, request)
if err != nil {
glog.V(1).Infof("update entry %s/%s :%v", request.Directory, request.Entry.Name, err)
return fmt.Errorf("UpdateEntry: %v", err)
@@ -130,8 +130,8 @@ func UpdateEntry(client SeaweedFilerClient, request *UpdateEntryRequest) error {
return nil
}
-func LookupEntry(client SeaweedFilerClient, request *LookupDirectoryEntryRequest) (*LookupDirectoryEntryResponse, error) {
- resp, err := client.LookupDirectoryEntry(context.Background(), request)
+func LookupEntry(ctx context.Context, client SeaweedFilerClient, request *LookupDirectoryEntryRequest) (*LookupDirectoryEntryResponse, error) {
+ resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
return nil, ErrNotFound