aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-01-20 01:48:12 -0800
committerchrislu <chris.lu@gmail.com>2023-01-20 01:48:12 -0800
commit81fdf3651b1f60642fc15bd2b55ed0bd31afac15 (patch)
tree19129015db907153d6aa89058c621e2bf93a6bae /weed/command
parentb04865974905c2b31eb23b966df6386172e5ba50 (diff)
downloadseaweedfs-81fdf3651b1f60642fc15bd2b55ed0bd31afac15.tar.xz
seaweedfs-81fdf3651b1f60642fc15bd2b55ed0bd31afac15.zip
grpc connection to filer add sw-client-id header
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_cat.go2
-rw-r--r--weed/command/filer_copy.go12
-rw-r--r--weed/command/filer_meta_backup.go2
-rw-r--r--weed/command/filer_remote_gateway.go2
-rw-r--r--weed/command/filer_remote_sync.go2
-rw-r--r--weed/command/filer_sync.go4
-rw-r--r--weed/command/iam.go2
-rw-r--r--weed/command/s3.go2
-rw-r--r--weed/command/webdav.go2
9 files changed, 16 insertions, 14 deletions
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index c310b2b43..2ef3bfc33 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -96,7 +96,7 @@ func runFilerCat(cmd *Command, args []string) bool {
writer = f
}
- pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pb.WithFilerClient(false, util.RandomInt32(), filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 0c4626317..4cef053fc 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -159,6 +159,7 @@ func runCopy(cmd *Command, args []string) bool {
worker := FileCopyWorker{
options: &copy,
filerAddress: filerAddress,
+ signature: util.RandomInt32(),
}
if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
@@ -172,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool {
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
- err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, 0, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@@ -225,6 +226,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
type FileCopyWorker struct {
options *CopyOptions
filerAddress pb.ServerAddress
+ signature int32
}
func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
@@ -302,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi
return
}
- err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
@@ -368,7 +370,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano()))
}
- if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -479,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("create manifest: %v", manifestErr)
}
- if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -569,7 +571,7 @@ var _ = filer_pb.FilerClient(&FileCopyWorker{})
func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
filerGrpcAddress := worker.filerAddress.ToGrpcAddress()
- err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
+ err = pb.WithGrpcClient(streamingMode, worker.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, false, worker.options.grpcDialOption)
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index e0f23ee27..f2cba9382 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -225,7 +225,7 @@ var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(streamingMode, metaBackup.clientId, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go
index 6446d28d0..f7b1f3146 100644
--- a/weed/command/filer_remote_gateway.go
+++ b/weed/command/filer_remote_gateway.go
@@ -35,7 +35,7 @@ type RemoteGatewayOptions struct {
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index d22fd57f8..261e024a6 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -26,7 +26,7 @@ type RemoteSyncOptions struct {
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 992b9dd4e..efef6250e 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -304,7 +304,7 @@ func getSignaturePrefixByPath(path string) string {
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
- readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
@@ -330,7 +330,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
- return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
diff --git a/weed/command/iam.go b/weed/command/iam.go
index 43234aa70..95964994f 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -50,7 +50,7 @@ func (iamopt *IamOptions) startIamServer() bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
- err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 369340151..39d1c6fce 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -163,7 +163,7 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int
for {
- err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 987fc388e..67e6ce69c 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -87,7 +87,7 @@ func (wo *WebDavOption) startWebDav() bool {
var cipher bool
// connect to filer
for {
- err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)