diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_remote.go | 26 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_remote.go | 17 |
4 files changed, 23 insertions, 27 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 08b01dd09..e025e73dc 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -384,6 +384,8 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) + t := &filer_pb.GetFilerConfigurationResponse{ Masters: fs.option.Masters, Collection: fs.option.Collection, @@ -395,6 +397,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. MetricsAddress: fs.metricsAddress, MetricsIntervalSec: int32(fs.metricsIntervalSec), Version: util.Version(), + ClusterId: string(clusterId), } glog.V(4).Infof("GetFilerConfiguration: %v", t) diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 8049f4f46..8064431c5 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -6,11 +6,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "strings" + "sync" "time" ) @@ -27,7 +29,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo } // find mapping - var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation var localMountedDir string for k, loc := range mappings.Mappings { if strings.HasPrefix(req.Directory, k) { @@ -43,7 +45,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo if err != nil { return nil, err } - storageConf := &filer_pb.RemoteConf{} + storageConf := &remote_pb.RemoteConf{} if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil { return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) } @@ -79,12 +81,15 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo var chunks []*filer_pb.FileChunk var fetchAndWriteErr error + var wg sync.WaitGroup limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { localOffset := offset + wg.Add(1) limitedConcurrentExecutor.Execute(func() { + defer wg.Done() size := chunkSize if localOffset+chunkSize > entry.Remote.RemoteSize { size = entry.Remote.RemoteSize - localOffset @@ -114,14 +119,12 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo Cookie: uint32(fileId.Cookie), Offset: localOffset, Size: size, - RemoteType: storageConf.Type, - RemoteName: storageConf.Name, - S3AccessKey: storageConf.S3AccessKey, - S3SecretKey: storageConf.S3SecretKey, - S3Region: storageConf.S3Region, - S3Endpoint: storageConf.S3Endpoint, - RemoteBucket: remoteStorageMountedLocation.Bucket, - RemotePath: string(dest), + RemoteConf: storageConf, + RemoteLocation: &remote_pb.RemoteStorageLocation{ + Name: remoteStorageMountedLocation.Name, + Bucket: remoteStorageMountedLocation.Bucket, + Path: string(dest), + }, }) if fetchAndWriteErr != nil { return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) @@ -129,7 +132,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo return nil }) - if err != nil { + if err != nil && fetchAndWriteErr == nil { fetchAndWriteErr = err return } @@ -148,6 +151,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo }) } + wg.Wait() if fetchAndWriteErr != nil { return nil, fetchAndWriteErr } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 452c2766e..d7e4f302a 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "io" "io/ioutil" "math" @@ -12,7 +13,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -60,7 +60,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ } // write .vif files - if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil { + if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 5ca6619bd..0615a96a1 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -17,25 +16,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } - remoteConf := &filer_pb.RemoteConf{ - Type: req.RemoteType, - Name: req.RemoteName, - S3AccessKey: req.S3AccessKey, - S3SecretKey: req.S3SecretKey, - S3Region: req.S3Region, - S3Endpoint: req.S3Endpoint, - } + remoteConf := req.RemoteConf client, getClientErr := remote_storage.GetRemoteStorage(remoteConf) if getClientErr != nil { return nil, fmt.Errorf("get remote client: %v", getClientErr) } - remoteStorageLocation := &filer_pb.RemoteStorageLocation{ - Name: req.RemoteName, - Bucket: req.RemoteBucket, - Path: req.RemotePath, - } + remoteStorageLocation := req.RemoteLocation + data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size) if ReadRemoteErr != nil { return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) |
