diff options
| author | chrislu <chris.lu@gmail.com> | 2023-01-20 01:48:12 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-01-20 01:48:12 -0800 |
| commit | 81fdf3651b1f60642fc15bd2b55ed0bd31afac15 (patch) | |
| tree | 19129015db907153d6aa89058c621e2bf93a6bae /weed/command | |
| parent | b04865974905c2b31eb23b966df6386172e5ba50 (diff) | |
| download | seaweedfs-81fdf3651b1f60642fc15bd2b55ed0bd31afac15.tar.xz seaweedfs-81fdf3651b1f60642fc15bd2b55ed0bd31afac15.zip | |
grpc connection to filer add sw-client-id header
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_cat.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 12 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 4 | ||||
| -rw-r--r-- | weed/command/iam.go | 2 | ||||
| -rw-r--r-- | weed/command/s3.go | 2 | ||||
| -rw-r--r-- | weed/command/webdav.go | 2 |
9 files changed, 16 insertions, 14 deletions
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index c310b2b43..2ef3bfc33 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -96,7 +96,7 @@ func runFilerCat(cmd *Command, args []string) bool { writer = f } - pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pb.WithFilerClient(false, util.RandomInt32(), filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 0c4626317..4cef053fc 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -159,6 +159,7 @@ func runCopy(cmd *Command, args []string) bool { worker := FileCopyWorker{ options: ©, filerAddress: filerAddress, + signature: util.RandomInt32(), } if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) @@ -172,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool { } func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, 0, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -225,6 +226,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi type FileCopyWorker struct { options *CopyOptions filerAddress pb.ServerAddress + signature int32 } func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { @@ -302,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -368,7 +370,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano())) } - if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -479,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -569,7 +571,7 @@ var _ = filer_pb.FilerClient(&FileCopyWorker{}) func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { filerGrpcAddress := worker.filerAddress.ToGrpcAddress() - err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { + err = pb.WithGrpcClient(streamingMode, worker.signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, filerGrpcAddress, false, worker.options.grpcDialOption) diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index e0f23ee27..f2cba9382 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -225,7 +225,7 @@ var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, metaBackup.clientId, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 6446d28d0..f7b1f3146 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -35,7 +35,7 @@ type RemoteGatewayOptions struct { var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index d22fd57f8..261e024a6 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -26,7 +26,7 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 992b9dd4e..efef6250e 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -304,7 +304,7 @@ func getSignaturePrefixByPath(path string) string { func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -330,7 +330,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) diff --git a/weed/command/iam.go b/weed/command/iam.go index 43234aa70..95964994f 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -50,7 +50,7 @@ func (iamopt *IamOptions) startIamServer() bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/s3.go b/weed/command/s3.go index 369340151..39d1c6fce 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -163,7 +163,7 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 987fc388e..67e6ce69c 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -87,7 +87,7 @@ func (wo *WebDavOption) startWebDav() bool { var cipher bool // connect to filer for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) |
