aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorvadimartynov <166398828+vadimartynov@users.noreply.github.com>2024-06-14 21:40:34 +0300
committerGitHub <noreply@github.com>2024-06-14 11:40:34 -0700
commit8aae82dd71735df1cd915a47a9b9d453d393adf6 (patch)
treec9bb9a876485fddfd17c84c857e2cb97a46dfc5d /weed/command
parent2a169dde9a4dab7d3b66f1be47b29d42b730f3a6 (diff)
downloadseaweedfs-8aae82dd71735df1cd915a47a9b9d453d393adf6.tar.xz
seaweedfs-8aae82dd71735df1cd915a47a9b9d453d393adf6.zip
Added context for the MasterClient's methods to avoid endless loops (#5628)
* Added context for the MasterClient's methods to avoid endless loops * Returned WithClient function. Added WithClientCustomGetMaster function * Hid unused ctx arguments * Using a common context for the KeepConnectedToMaster and WaitUntilConnected functions * Changed the context termination check in the tryConnectToMaster function * Added a child context to the tryConnectToMaster function * Added a common context for KeepConnectedToMaster and WaitUntilConnected functions in benchmark
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go3
-rw-r--r--weed/command/benchmark.go8
-rw-r--r--weed/command/download.go3
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/command/upload.go4
7 files changed, 15 insertions, 10 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index bbb6c6724..a8be4838e 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -74,7 +75,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
- lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
+ lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 7f9a23cf8..2a0db47c2 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -2,6 +2,7 @@ package command
import (
"bufio"
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
@@ -128,8 +129,9 @@ func runBenchmark(cmd *Command, args []string) bool {
}
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
- go b.masterClient.KeepConnectedToMaster()
- b.masterClient.WaitUntilConnected()
+ ctx := context.Background()
+ go b.masterClient.KeepConnectedToMaster(ctx)
+ b.masterClient.WaitUntilConnected(ctx)
if *b.write {
benchWrite()
@@ -210,7 +212,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
}
var jwtAuthorization security.EncodedJwt
if isSecure {
- jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid)
+ jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
}
if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++
diff --git a/weed/command/download.go b/weed/command/download.go
index de33643fc..060be9f14 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"io"
"net/http"
@@ -50,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for _, fid := range args {
- if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
+ if e := downloadToFile(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 83c5b167e..df5e002c5 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -472,7 +472,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(func() pb.ServerAddress {
+ operation.DeleteFiles(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(copy.masters[0])
}, false, worker.options.grpcDialOption, fileIds)
return uploadError
diff --git a/weed/command/master.go b/weed/command/master.go
index 6a32b8abe..f80d8faeb 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"net/http"
"os"
@@ -218,7 +219,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}()
}
- go ms.MasterClient.KeepConnectedToMaster()
+ go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
var (
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index 64583c602..7217aff0b 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -140,7 +140,7 @@ func startMasterFollower(masterOptions MasterOptions) {
}
go grpcS.Serve(grpcL)
- go ms.MasterClient.KeepConnectedToMaster()
+ go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
httpS := &http.Server{Handler: r}
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 1f03f7b5a..3e6b8f9a2 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -97,7 +97,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -119,7 +119,7 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
return false
}
- results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
if err != nil {
fmt.Println(err.Error())
return false