aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2021-12-26 00:15:03 -0800
committerchrislu <chris.lu@gmail.com>2021-12-26 00:15:03 -0800
commit9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch)
tree1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/shell
parentc935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff)
downloadseaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz
seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call. this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_cluster_ps.go2
-rw-r--r--weed/shell/command_collection_delete.go2
-rw-r--r--weed/shell/command_collection_list.go2
-rw-r--r--weed/shell/command_ec_common.go8
-rw-r--r--weed/shell/command_ec_decode.go10
-rw-r--r--weed/shell/command_ec_encode.go2
-rw-r--r--weed/shell/command_ec_rebuild.go4
-rw-r--r--weed/shell/command_fs_cat.go2
-rw-r--r--weed/shell/command_fs_configure.go2
-rw-r--r--weed/shell/command_fs_meta_cat.go2
-rw-r--r--weed/shell/command_fs_meta_load.go2
-rw-r--r--weed/shell/command_fs_mkdir.go2
-rw-r--r--weed/shell/command_fs_mv.go2
-rw-r--r--weed/shell/command_fs_rm.go2
-rw-r--r--weed/shell/command_remote_configure.go4
-rw-r--r--weed/shell/command_remote_meta_sync.go2
-rw-r--r--weed/shell/command_remote_mount.go2
-rw-r--r--weed/shell/command_remote_uncache.go2
-rw-r--r--weed/shell/command_remote_unmount.go2
-rw-r--r--weed/shell/command_s3_bucket_create.go2
-rw-r--r--weed/shell/command_s3_bucket_delete.go2
-rw-r--r--weed/shell/command_s3_bucket_list.go2
-rw-r--r--weed/shell/command_s3_configure.go4
-rw-r--r--weed/shell/command_volume_check_disk.go6
-rw-r--r--weed/shell/command_volume_configure_replication.go2
-rw-r--r--weed/shell/command_volume_fix_replication.go2
-rw-r--r--weed/shell/command_volume_fsck.go2
-rw-r--r--weed/shell/command_volume_mount.go2
-rw-r--r--weed/shell/command_volume_move.go12
-rw-r--r--weed/shell/command_volume_server_leave.go2
-rw-r--r--weed/shell/command_volume_tier_download.go2
-rw-r--r--weed/shell/command_volume_tier_upload.go2
-rw-r--r--weed/shell/command_volume_unmount.go2
-rw-r--r--weed/shell/command_volume_vacuum.go2
-rw-r--r--weed/shell/commands.go4
-rw-r--r--weed/shell/shell_liner.go4
36 files changed, 55 insertions, 55 deletions
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go
index 5ed1677c8..2d391de1d 100644
--- a/weed/shell/command_cluster_ps.go
+++ b/weed/shell/command_cluster_ps.go
@@ -36,7 +36,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
return nil
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index 8942c15da..55c8ddf19 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -53,7 +53,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *collectionName,
})
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index ba502a6b9..b2aa6d2f4 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -62,7 +62,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 51c8c32cd..765c0ad89 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -61,7 +61,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
- err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetAddress != existingLocation {
@@ -241,7 +241,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
@@ -256,7 +256,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
@@ -269,7 +269,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index b2ca605c7..288fa4945 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -116,7 +116,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
- if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(vid),
})
@@ -149,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: uint32(vid),
Collection: collection,
@@ -187,7 +187,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
continue
}
- err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
@@ -223,7 +223,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
var resp *master_pb.LookupVolumeResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
return err
})
@@ -236,7 +236,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 6add14749..fcdee264e 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -126,7 +126,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 409ec4329..f5d1166d2 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -172,7 +172,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
- err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
VolumeId: uint32(volumeId),
Collection: collection,
@@ -213,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection
var copyErr error
if applyBalancing {
- copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index a5731240d..17e9c6550 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -41,7 +41,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write
dir, name := util.FullPath(path).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index ece805db3..73bb8e5c6 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -117,7 +117,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
}); err != nil && err != filer_pb.ErrNotFound {
return err
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index e0525defa..a7de6d3ef 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
dir, name := util.FullPath(path).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 46dc07e9a..7fe4cf809 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -48,7 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go
index 11b663eec..71a35cb68 100644
--- a/weed/shell/command_fs_mkdir.go
+++ b/weed/shell/command_fs_mkdir.go
@@ -36,7 +36,7 @@ func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Wri
dir, name := util.FullPath(path).DirAndName()
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: dir,
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index 2448c8f61..612cdc84e 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -54,7 +54,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
destinationDir, destinationName := util.FullPath(destinationPath).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// collect destination entry info
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go
index b383366ca..2ce3275bb 100644
--- a/weed/shell/command_fs_rm.go
+++ b/weed/shell/command_fs_rm.go
@@ -56,7 +56,7 @@ func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return fmt.Errorf("need to have arguments")
}
- commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
for _, entry := range entiries {
targetPath, err := commandEnv.parseUrl(entry)
if err != nil {
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index 3fa905237..c892c3443 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -184,7 +184,7 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE
func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error {
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: filer.DirectoryEtcRemote,
@@ -214,7 +214,7 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write
return err
}
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data)
}); err != nil && err != filer_pb.ErrNotFound {
return err
diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go
index 277c4c2be..e09a66761 100644
--- a/weed/shell/command_remote_meta_sync.go
+++ b/weed/shell/command_remote_meta_sync.go
@@ -117,7 +117,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util
remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
index 2b57db707..019bc93c9 100644
--- a/weed/shell/command_remote_mount.go
+++ b/weed/shell/command_remote_mount.go
@@ -117,7 +117,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error {
func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error {
// find existing directory, and ensure the directory is empty
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
_, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go
index 53d043ebf..a3433621e 100644
--- a/weed/shell/command_remote_uncache.go
+++ b/weed/shell/command_remote_uncache.go
@@ -105,7 +105,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer
fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name))
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: string(dir),
Entry: entry,
diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go
index c947a19e6..49b07f8f1 100644
--- a/weed/shell/command_remote_unmount.go
+++ b/weed/shell/command_remote_unmount.go
@@ -83,7 +83,7 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer
func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error {
// find existing directory, and ensure the directory is empty
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go
index a512ffc4a..35ecae4ee 100644
--- a/weed/shell/command_s3_bucket_create.go
+++ b/weed/shell/command_s3_bucket_create.go
@@ -45,7 +45,7 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("empty bucket name")
}
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 26953c249..5f8585b0b 100644
--- a/weed/shell/command_s3_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -52,7 +52,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
}
// delete the collection directly first
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *bucketName,
})
diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go
index 0c4e8d18f..a344a79a4 100644
--- a/weed/shell/command_s3_bucket_list.go
+++ b/weed/shell/command_s3_bucket_list.go
@@ -65,7 +65,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i
}
func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index 5eab2ebd0..cefb1deeb 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -48,7 +48,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
}
var buf bytes.Buffer
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf)
}); err != nil && err != filer_pb.ErrNotFound {
return err
@@ -171,7 +171,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
}); err != nil {
return err
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 643cccac3..cf3bc604a 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -182,7 +182,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
- err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@@ -200,7 +200,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.Serv
func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
- return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@@ -229,7 +229,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
- return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index 27cba618b..5c4c10146 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -86,7 +86,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 2885ba11f..43bf4d0f8 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -265,7 +265,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
break
}
- err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index a7a981339..7a7cdee56 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -248,7 +248,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
}
- return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
if vinfo.isEcVolume {
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 575051ffe..7b03b8dfa 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -55,7 +55,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
}
func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 796f74264..d7cc8b8ce 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -112,7 +112,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
return
}
- clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: uint32(volumeId),
})
@@ -123,7 +123,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
}
}()
- err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(volumeId),
})
@@ -140,7 +140,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
return
}
- err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
@@ -173,7 +173,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
- return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
@@ -186,7 +186,7 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
}
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})
@@ -195,7 +195,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
}
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: uint32(volumeId),
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
index 4daa589be..029ef2201 100644
--- a/weed/shell/command_volume_server_leave.go
+++ b/weed/shell/command_volume_server_leave.go
@@ -56,7 +56,7 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri
}
func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) {
- return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
if leaveErr != nil {
fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index 57d3bf347..a2330ab8a 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -124,7 +124,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error {
- err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index a22fe92a1..0df0790e6 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -118,7 +118,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index d5cb9f07c..85bec44f7 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -55,7 +55,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
}
func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go
index 2e09a8c1b..a09bf5d56 100644
--- a/weed/shell/command_volume_vacuum.go
+++ b/weed/shell/command_volume_vacuum.go
@@ -39,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
return
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
})
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 02c0af59e..985f6423b 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -97,9 +97,9 @@ func (ce *CommandEnv) checkDirectory(path string) error {
var _ = filer_pb.FilerClient(&CommandEnv{})
-func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
+ return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index caf8da859..0aa65f049 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -53,7 +53,7 @@ func RunShell(options ShellOptions) {
if commandEnv.option.FilerAddress == "" {
var filers []pb.ServerAddress
- commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
@@ -75,7 +75,7 @@ func RunShell(options ShellOptions) {
}
if commandEnv.option.FilerAddress != "" {
- commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err