diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_remote.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 2 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_collection.go | 4 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 2 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 14 |
9 files changed, 19 insertions, 19 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a0385f487..8e6cd8451 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -327,7 +327,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect glog.V(4).Infof("CollectionList %v", req) resp = &filer_pb.CollectionListResponse{} - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: req.IncludeNormalVolumes, IncludeEcVolumes: req.IncludeEcVolumes, @@ -348,7 +348,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet glog.V(4).Infof("DeleteCollection %v", req) - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), }) @@ -362,7 +362,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR var output *master_pb.StatisticsResponse - err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ Replication: req.Replication, Collection: req.Collection, diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 7f31d8cc1..3be986023 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -123,7 +123,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req // tell filer to tell volume server to download into needles assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) - err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ VolumeId: uint32(fileId.VolumeId), NeedleId: uint64(fileId.Key), diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c9343a9bf..1a5f80369 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -165,7 +165,7 @@ func (fs *FilerServer) checkWithMaster() { isConnected := false for !isConnected { for _, master := range fs.option.Masters { - readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index 55f3faf8c..654da6b3c 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) @@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 41a2b570b..72d4e20d7 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -27,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 2659307fc..f3f99ee7b 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -26,7 +26,7 @@ func (vs *VolumeServer) GetMaster() pb.ServerAddress { func (vs *VolumeServer) checkWithMaster() (err error) { for { for _, master := range vs.SeedMasterNodes { - err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 9d9f756ce..52181a771 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -45,7 +45,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ @@ -226,7 +226,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s if receiveErr == io.EOF { break } - if resp!=nil && resp.ModifiedTsNs != 0 { + if resp != nil && resp.ModifiedTsNs != 0 { modifiedTsNs = resp.ModifiedTsNs } if receiveErr != nil { diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 861d352d7..79611f499 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -126,7 +126,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 6c1de3154..018daed8b 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -120,9 +120,9 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { var _ = filer_pb.FilerClient(&WebDavFileSystem{}) -func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) @@ -162,7 +162,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -212,7 +212,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f } dir, name := util.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -315,7 +315,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) oldDir, oldBaseName := util.FullPath(oldName).DirAndName() newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -375,7 +375,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 var fileId, host string var auth security.EncodedJwt - if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -477,7 +477,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Chunks = manifestedChunks } - flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() f.entry.Attributes.Collection = f.collection f.entry.Attributes.Replication = f.replication |
