aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_cluster_ps.go2
-rw-r--r--weed/shell/command_collection_delete.go2
-rw-r--r--weed/shell/command_collection_list.go2
-rw-r--r--weed/shell/command_ec_common.go8
-rw-r--r--weed/shell/command_ec_decode.go10
-rw-r--r--weed/shell/command_ec_encode.go2
-rw-r--r--weed/shell/command_ec_rebuild.go4
-rw-r--r--weed/shell/command_fs_cat.go2
-rw-r--r--weed/shell/command_fs_configure.go2
-rw-r--r--weed/shell/command_fs_meta_cat.go2
-rw-r--r--weed/shell/command_fs_meta_load.go2
-rw-r--r--weed/shell/command_fs_mkdir.go2
-rw-r--r--weed/shell/command_fs_mv.go2
-rw-r--r--weed/shell/command_fs_rm.go2
-rw-r--r--weed/shell/command_remote_configure.go4
-rw-r--r--weed/shell/command_remote_meta_sync.go2
-rw-r--r--weed/shell/command_remote_mount.go2
-rw-r--r--weed/shell/command_remote_uncache.go2
-rw-r--r--weed/shell/command_remote_unmount.go2
-rw-r--r--weed/shell/command_s3_bucket_create.go2
-rw-r--r--weed/shell/command_s3_bucket_delete.go2
-rw-r--r--weed/shell/command_s3_bucket_list.go2
-rw-r--r--weed/shell/command_s3_configure.go4
-rw-r--r--weed/shell/command_volume_check_disk.go6
-rw-r--r--weed/shell/command_volume_configure_replication.go2
-rw-r--r--weed/shell/command_volume_fix_replication.go2
-rw-r--r--weed/shell/command_volume_fsck.go2
-rw-r--r--weed/shell/command_volume_mount.go2
-rw-r--r--weed/shell/command_volume_move.go12
-rw-r--r--weed/shell/command_volume_server_leave.go2
-rw-r--r--weed/shell/command_volume_tier_download.go2
-rw-r--r--weed/shell/command_volume_tier_upload.go2
-rw-r--r--weed/shell/command_volume_unmount.go2
-rw-r--r--weed/shell/command_volume_vacuum.go2
-rw-r--r--weed/shell/commands.go4
-rw-r--r--weed/shell/shell_liner.go4
36 files changed, 55 insertions, 55 deletions
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go
index 5ed1677c8..2d391de1d 100644
--- a/weed/shell/command_cluster_ps.go
+++ b/weed/shell/command_cluster_ps.go
@@ -36,7 +36,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
return nil
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index 8942c15da..55c8ddf19 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -53,7 +53,7 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *collectionName,
})
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index ba502a6b9..b2aa6d2f4 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -62,7 +62,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 51c8c32cd..765c0ad89 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -61,7 +61,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
- err = operation.WithVolumeServerClient(targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetAddress != existingLocation {
@@ -241,7 +241,7 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
@@ -256,7 +256,7 @@ func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, s
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
@@ -269,7 +269,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index b2ca605c7..288fa4945 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -116,7 +116,7 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
- if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(vid),
})
@@ -149,7 +149,7 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: uint32(vid),
Collection: collection,
@@ -187,7 +187,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
continue
}
- err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
@@ -223,7 +223,7 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
var resp *master_pb.LookupVolumeResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
return err
})
@@ -236,7 +236,7 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 6add14749..fcdee264e 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -126,7 +126,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 409ec4329..f5d1166d2 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -172,7 +172,7 @@ func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection st
func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
- err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
VolumeId: uint32(volumeId),
Collection: collection,
@@ -213,7 +213,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection
var copyErr error
if applyBalancing {
- copyErr = operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ copyErr = operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(rebuilder.info), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index a5731240d..17e9c6550 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -41,7 +41,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write
dir, name := util.FullPath(path).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go
index ece805db3..73bb8e5c6 100644
--- a/weed/shell/command_fs_configure.go
+++ b/weed/shell/command_fs_configure.go
@@ -117,7 +117,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcSeaweedFS, filer.FilerConfName, buf2.Bytes())
}); err != nil && err != filer_pb.ErrNotFound {
return err
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index e0525defa..a7de6d3ef 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
dir, name := util.FullPath(path).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 46dc07e9a..7fe4cf809 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -48,7 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
diff --git a/weed/shell/command_fs_mkdir.go b/weed/shell/command_fs_mkdir.go
index 11b663eec..71a35cb68 100644
--- a/weed/shell/command_fs_mkdir.go
+++ b/weed/shell/command_fs_mkdir.go
@@ -36,7 +36,7 @@ func (c *commandFsMkdir) Do(args []string, commandEnv *CommandEnv, writer io.Wri
dir, name := util.FullPath(path).DirAndName()
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, createErr := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{
Directory: dir,
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index 2448c8f61..612cdc84e 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -54,7 +54,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
destinationDir, destinationName := util.FullPath(destinationPath).DirAndName()
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// collect destination entry info
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
diff --git a/weed/shell/command_fs_rm.go b/weed/shell/command_fs_rm.go
index b383366ca..2ce3275bb 100644
--- a/weed/shell/command_fs_rm.go
+++ b/weed/shell/command_fs_rm.go
@@ -56,7 +56,7 @@ func (c *commandFsRm) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return fmt.Errorf("need to have arguments")
}
- commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
for _, entry := range entiries {
targetPath, err := commandEnv.parseUrl(entry)
if err != nil {
diff --git a/weed/shell/command_remote_configure.go b/weed/shell/command_remote_configure.go
index 3fa905237..c892c3443 100644
--- a/weed/shell/command_remote_configure.go
+++ b/weed/shell/command_remote_configure.go
@@ -184,7 +184,7 @@ func (c *commandRemoteConfigure) listExistingRemoteStorages(commandEnv *CommandE
func (c *commandRemoteConfigure) deleteRemoteStorage(commandEnv *CommandEnv, writer io.Writer, storageName string) error {
- return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: filer.DirectoryEtcRemote,
@@ -214,7 +214,7 @@ func (c *commandRemoteConfigure) saveRemoteStorage(commandEnv *CommandEnv, write
return err
}
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.DirectoryEtcRemote, conf.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, data)
}); err != nil && err != filer_pb.ErrNotFound {
return err
diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go
index 277c4c2be..e09a66761 100644
--- a/weed/shell/command_remote_meta_sync.go
+++ b/weed/shell/command_remote_meta_sync.go
@@ -117,7 +117,7 @@ func pullMetadata(commandEnv *CommandEnv, writer io.Writer, localMountedDir util
remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToCache)
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
index 2b57db707..019bc93c9 100644
--- a/weed/shell/command_remote_mount.go
+++ b/weed/shell/command_remote_mount.go
@@ -117,7 +117,7 @@ func jsonPrintln(writer io.Writer, message proto.Message) error {
func syncMetadata(commandEnv *CommandEnv, writer io.Writer, dir string, nonEmpty bool, remoteConf *remote_pb.RemoteConf, remote *remote_pb.RemoteStorageLocation) error {
// find existing directory, and ensure the directory is empty
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
_, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go
index 53d043ebf..a3433621e 100644
--- a/weed/shell/command_remote_uncache.go
+++ b/weed/shell/command_remote_uncache.go
@@ -105,7 +105,7 @@ func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer
fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name))
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
Directory: string(dir),
Entry: entry,
diff --git a/weed/shell/command_remote_unmount.go b/weed/shell/command_remote_unmount.go
index c947a19e6..49b07f8f1 100644
--- a/weed/shell/command_remote_unmount.go
+++ b/weed/shell/command_remote_unmount.go
@@ -83,7 +83,7 @@ func (c *commandRemoteUnmount) Do(args []string, commandEnv *CommandEnv, writer
func (c *commandRemoteUnmount) purgeMountedData(commandEnv *CommandEnv, dir string) error {
// find existing directory, and ensure the directory is empty
- err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
parent, name := util.FullPath(dir).DirAndName()
lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: parent,
diff --git a/weed/shell/command_s3_bucket_create.go b/weed/shell/command_s3_bucket_create.go
index a512ffc4a..35ecae4ee 100644
--- a/weed/shell/command_s3_bucket_create.go
+++ b/weed/shell/command_s3_bucket_create.go
@@ -45,7 +45,7 @@ func (c *commandS3BucketCreate) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("empty bucket name")
}
- err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 26953c249..5f8585b0b 100644
--- a/weed/shell/command_s3_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -52,7 +52,7 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
}
// delete the collection directly first
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: *bucketName,
})
diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go
index 0c4e8d18f..a344a79a4 100644
--- a/weed/shell/command_s3_bucket_list.go
+++ b/weed/shell/command_s3_bucket_list.go
@@ -65,7 +65,7 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i
}
func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index 5eab2ebd0..cefb1deeb 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -48,7 +48,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
}
var buf bytes.Buffer
- if err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.ReadEntry(commandEnv.MasterClient, client, filer.IamConfigDirecotry, filer.IamIdentityFile, &buf)
}); err != nil && err != filer_pb.ErrNotFound {
return err
@@ -171,7 +171,7 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
if *apply {
- if err := commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer.SaveInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile, buf.Bytes())
}); err != nil {
return err
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 643cccac3..cf3bc604a 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -182,7 +182,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {
- err = operation.WithVolumeServerClient(sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.ReadNeedleBlob(context.Background(), &volume_server_pb.ReadNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@@ -200,7 +200,7 @@ func (c *commandVolumeCheckDisk) readSourceNeedleBlob(sourceVolumeServer pb.Serv
func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error {
- return operation.WithVolumeServerClient(targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{
VolumeId: volumeId,
NeedleId: uint64(needleValue.Key),
@@ -229,7 +229,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error {
- return operation.WithVolumeServerClient(volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index 27cba618b..5c4c10146 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -86,7 +86,7 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 2885ba11f..43bf4d0f8 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -265,7 +265,7 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
break
}
- err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index a7a981339..7a7cdee56 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -248,7 +248,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
}
- return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ext := ".idx"
if vinfo.isEcVolume {
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 575051ffe..7b03b8dfa 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -55,7 +55,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
}
func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 796f74264..d7cc8b8ce 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -112,7 +112,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
return
}
- clientErr := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: uint32(volumeId),
})
@@ -123,7 +123,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
}
}()
- err = operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
VolumeId: uint32(volumeId),
})
@@ -140,7 +140,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
return
}
- err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: string(sourceVolumeServer),
@@ -173,7 +173,7 @@ func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needl
func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
- return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
@@ -186,7 +186,7 @@ func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source
}
func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})
@@ -195,7 +195,7 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
}
func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable bool) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if writable {
_, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
VolumeId: uint32(volumeId),
diff --git a/weed/shell/command_volume_server_leave.go b/weed/shell/command_volume_server_leave.go
index 4daa589be..029ef2201 100644
--- a/weed/shell/command_volume_server_leave.go
+++ b/weed/shell/command_volume_server_leave.go
@@ -56,7 +56,7 @@ func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, wri
}
func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) {
- return operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
if leaveErr != nil {
fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index 57d3bf347..a2330ab8a 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -124,7 +124,7 @@ func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection s
func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer pb.ServerAddress) error {
- err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index a22fe92a1..0df0790e6 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -118,7 +118,7 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress, dest string, keepLocalDatFile bool) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index d5cb9f07c..85bec44f7 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -55,7 +55,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
}
func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go
index 2e09a8c1b..a09bf5d56 100644
--- a/weed/shell/command_volume_vacuum.go
+++ b/weed/shell/command_volume_vacuum.go
@@ -39,7 +39,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ
return
}
- err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err = client.VacuumVolume(context.Background(), &master_pb.VacuumVolumeRequest{
GarbageThreshold: float32(*garbageThreshold),
})
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 02c0af59e..985f6423b 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -97,9 +97,9 @@ func (ce *CommandEnv) checkDirectory(path string) error {
var _ = filer_pb.FilerClient(&CommandEnv{})
-func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithGrpcFilerClient(ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
+ return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index caf8da859..0aa65f049 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -53,7 +53,7 @@ func RunShell(options ShellOptions) {
if commandEnv.option.FilerAddress == "" {
var filers []pb.ServerAddress
- commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
@@ -75,7 +75,7 @@ func RunShell(options ShellOptions) {
}
if commandEnv.option.FilerAddress != "" {
- commandEnv.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err