aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/filer_grpc_server_remote.go2
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/master_grpc_server_collection.go4
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/volume_grpc_client_to_master.go2
-rw-r--r--weed/server/volume_grpc_copy.go4
-rw-r--r--weed/server/volume_grpc_erasure_coding.go2
-rw-r--r--weed/server/webdav_server.go14
9 files changed, 19 insertions, 19 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a0385f487..8e6cd8451 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -327,7 +327,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect
glog.V(4).Infof("CollectionList %v", req)
resp = &filer_pb.CollectionListResponse{}
- err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: req.IncludeNormalVolumes,
IncludeEcVolumes: req.IncludeEcVolumes,
@@ -348,7 +348,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
glog.V(4).Infof("DeleteCollection %v", req)
- err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
})
@@ -362,7 +362,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
var output *master_pb.StatisticsResponse
- err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
Replication: req.Replication,
Collection: req.Collection,
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 7f31d8cc1..3be986023 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -123,7 +123,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
// tell filer to tell volume server to download into needles
assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
- err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index c9343a9bf..1a5f80369 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -165,7 +165,7 @@ func (fs *FilerServer) checkWithMaster() {
isConnected := false
for !isConnected {
for _, master := range fs.option.Masters {
- readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index 55f3faf8c..654da6b3c 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
@@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
for _, server := range listOfEcServers {
- err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 41a2b570b..72d4e20d7 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -27,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 2659307fc..f3f99ee7b 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -26,7 +26,7 @@ func (vs *VolumeServer) GetMaster() pb.ServerAddress {
func (vs *VolumeServer) checkWithMaster() (err error) {
for {
for _, master := range vs.SeedMasterNodes {
- err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 9d9f756ce..52181a771 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -45,7 +45,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
- err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
@@ -226,7 +226,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
if receiveErr == io.EOF {
break
}
- if resp!=nil && resp.ModifiedTsNs != 0 {
+ if resp != nil && resp.ModifiedTsNs != 0 {
modifiedTsNs = resp.ModifiedTsNs
}
if receiveErr != nil {
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 861d352d7..79611f499 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -126,7 +126,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
- err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 6c1de3154..018daed8b 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -120,9 +120,9 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
var _ = filer_pb.FilerClient(&WebDavFileSystem{})
-func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
@@ -162,7 +162,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
return os.ErrExist
}
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -212,7 +212,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
}
dir, name := util.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
@@ -315,7 +315,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
newDir, newBaseName := util.FullPath(newName).DirAndName()
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
@@ -375,7 +375,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
var fileId, host string
var auth security.EncodedJwt
- if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
@@ -477,7 +477,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.entry.Chunks = manifestedChunks
}
- flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
f.entry.Attributes.Collection = f.collection
f.entry.Attributes.Replication = f.replication