diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-26 00:15:03 -0800 |
| commit | 9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch) | |
| tree | 1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/shell | |
| parent | c935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff) | |
| download | seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip | |
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call.
this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/shell')
36 files changed, 55 insertions, 55 deletions
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 5ed1677c8..2d391de1d 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -36,7 +36,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W return nil } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 8942c15da..55c8ddf19 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -53,7 +53,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ return nil } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: *collectionName, }) diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index ba502a6b9..b2aa6d2f4 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -62,7 +62,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { var resp *master_pb.CollectionListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: includeNormalVolumes, IncludeEcVolumes: includeEcVolumes, diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 51c8c32cd..765c0ad89 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -61,7 +61,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) targetAddress := pb.NewServerAddressFromDataNode(targetServer.info) - err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetAddress != existingLocation { @@ -241,7 +241,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -256,7 +256,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, @@ -269,7 +269,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index b2ca605c7..288fa4945 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -116,7 +116,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -149,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -187,7 +187,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) @@ -223,7 +223,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) { var resp *master_pb.LookupVolumeResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds}) return err }) @@ -236,7 +236,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 6add14749..fcdee264e 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -126,7 +126,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 409ec4329..f5d1166d2 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -172,7 +172,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -213,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index a5731240d..17e9c6550 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -41,7 +41,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write dir, name := util.FullPath(path).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index ece805db3..73bb8e5c6 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -117,7 +117,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes()) }); err != nil && err != filer_pb.ErrNotFound { return err diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index e0525defa..a7de6d3ef 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W dir, name := util.FullPath(path).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 46dc07e9a..7fe4cf809 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -48,7 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go index 11b663eec..71a35cb68 100644 --- a/weed/shell/command_fs_mkdir.go +++ b/weed/shell/command_fs_mkdir.go @@ -36,7 +36,7 @@ func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Wri dir, name := util.FullPath(path).DirAndName() - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ Directory: dir, diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 2448c8f61..612cdc84e 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -54,7 +54,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer destinationDir, destinationName := util.FullPath(destinationPath).DirAndName() - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go index b383366ca..2ce3275bb 100644 --- a/weed/shell/command_fs_rm.go +++ b/weed/shell/command_fs_rm.go @@ -56,7 +56,7 @@ func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer return fmt.Errorf("need to have arguments") } - commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { for _, entry := range entiries { targetPath, err := commandEnv.parseUrl(entry) if err != nil { diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go index 3fa905237..c892c3443 100644 --- a/weed/shell/command_remote_configure.go +++ b/weed/shell/command_remote_configure.go @@ -184,7 +184,7 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error { - return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ Directory: filer.DirectoryEtcRemote, @@ -214,7 +214,7 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write return err } - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data) }); err != nil && err != filer_pb.ErrNotFound { return err diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index 277c4c2be..e09a66761 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -117,7 +117,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache) - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir) diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go index 2b57db707..019bc93c9 100644 --- a/weed/shell/command_remote_mount.go +++ b/weed/shell/command_remote_mount.go @@ -117,7 +117,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error { func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error { // find existing directory, and ensure the directory is empty - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { parent, name := util.FullPath(dir).DirAndName() _, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go index 53d043ebf..a3433621e 100644 --- a/weed/shell/command_remote_uncache.go +++ b/weed/shell/command_remote_uncache.go @@ -105,7 +105,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name)) - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { _, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ Directory: string(dir), Entry: entry, diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go index c947a19e6..49b07f8f1 100644 --- a/weed/shell/command_remote_unmount.go +++ b/weed/shell/command_remote_unmount.go @@ -83,7 +83,7 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error { // find existing directory, and ensure the directory is empty - err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { parent, name := util.FullPath(dir).DirAndName() lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: parent, diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go index a512ffc4a..35ecae4ee 100644 --- a/weed/shell/command_s3_bucket_create.go +++ b/weed/shell/command_s3_bucket_create.go @@ -45,7 +45,7 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("empty bucket name") } - err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go index 26953c249..5f8585b0b 100644 --- a/weed/shell/command_s3_bucket_delete.go +++ b/weed/shell/command_s3_bucket_delete.go @@ -52,7 +52,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer } // delete the collection directly first - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: *bucketName, }) diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go index 0c4e8d18f..a344a79a4 100644 --- a/weed/shell/command_s3_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -65,7 +65,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i } func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index 5eab2ebd0..cefb1deeb 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -48,7 +48,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io } var buf bytes.Buffer - if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf) }); err != nil && err != filer_pb.ErrNotFound { return err @@ -171,7 +171,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io if *apply { - if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes()) }); err != nil { return err diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 643cccac3..cf3bc604a 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -182,7 +182,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) { - err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -200,7 +200,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.Serv func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -229,7 +229,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { - return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 27cba618b..5c4c10146 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -86,7 +86,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } for _, dst := range allLocations { - err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ VolumeId: uint32(vid), Replication: replicaPlacement.String(), diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 2885ba11f..43bf4d0f8 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -265,7 +265,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co break } - err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: replica.info.Id, SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index a7a981339..7a7cdee56 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -248,7 +248,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) } - return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" if vinfo.isEcVolume { diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 575051ffe..7b03b8dfa 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -55,7 +55,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io } func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 796f74264..d7cc8b8ce 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -112,7 +112,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl return } - clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ VolumeId: uint32(volumeId), }) @@ -123,7 +123,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl } }() - err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ VolumeId: uint32(volumeId), }) @@ -140,7 +140,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl return } - err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: string(sourceVolumeServer), @@ -173,7 +173,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { - return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, @@ -186,7 +186,7 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source } func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), }) @@ -195,7 +195,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour } func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if writable { _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ VolumeId: uint32(volumeId), diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go index 4daa589be..029ef2201 100644 --- a/weed/shell/command_volume_server_leave.go +++ b/weed/shell/command_volume_server_leave.go @@ -56,7 +56,7 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri } func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) { - return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{}) if leaveErr != nil { fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 57d3bf347..a2330ab8a 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -124,7 +124,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error { - err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index a22fe92a1..0df0790e6 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -118,7 +118,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index d5cb9f07c..85bec44f7 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -55,7 +55,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer } func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index 2e09a8c1b..a09bf5d56 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -39,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ return } - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{ GarbageThreshold: float32(*garbageThreshold), }) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 02c0af59e..985f6423b 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -97,9 +97,9 @@ func (ce *CommandEnv) checkDirectory(path string) error { var _ = filer_pb.FilerClient(&CommandEnv{}) -func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn) + return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn) } diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index caf8da859..0aa65f049 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -53,7 +53,7 @@ func RunShell(options ShellOptions) { if commandEnv.option.FilerAddress == "" { var filers []pb.ServerAddress - commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) @@ -75,7 +75,7 @@ func RunShell(options ShellOptions) { } if commandEnv.option.FilerAddress != "" { - commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err |
